aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2014-04-11 16:23:58 -0700
committerChris Lu <chris.lu@gmail.com>2014-04-11 16:23:58 -0700
commit008aee0dc1932f75c86e52893044d9cd953ef405 (patch)
tree979b04772948679684cd896435dd1b82c9d7bfe9
parent7c82e2316b9f09c0195b35a1487473db153bdec1 (diff)
downloadseaweedfs-008aee0dc1932f75c86e52893044d9cd953ef405.tar.xz
seaweedfs-008aee0dc1932f75c86e52893044d9cd953ef405.zip
Add retrying logic to wait for other peers during cluster bootstrapping.
-rw-r--r--go/topology/topology.go13
-rw-r--r--go/weed/weed_server/master_server.go8
-rw-r--r--go/weed/weed_server/raft_server.go26
-rw-r--r--go/weed/weed_server/raft_server_handlers.go12
4 files changed, 44 insertions, 15 deletions
diff --git a/go/topology/topology.go b/go/topology/topology.go
index d5af60cd8..6c5bde304 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -52,21 +52,26 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL
}
func (t *Topology) IsLeader() bool {
- return t.RaftServer == nil || t.Leader() == t.RaftServer.Name()
+ if leader, e := t.Leader(); e == nil {
+ return leader == t.RaftServer.Name()
+ }
+ return false
}
-func (t *Topology) Leader() string {
+func (t *Topology) Leader() (string, error) {
l := ""
if t.RaftServer != nil {
l = t.RaftServer.Leader()
+ } else {
+ return "", errors.New("Raft Server not ready yet!")
}
if l == "" {
// We are a single node cluster, we are the leader
- return t.RaftServer.Name()
+ return t.RaftServer.Name(), errors.New("Raft Server not initialized!")
}
- return l
+ return l, nil
}
func (t *Topology) loadConfiguration(configurationFile string) error {
diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go
index 63cd18546..286b90aca 100644
--- a/go/weed/weed_server/master_server.go
+++ b/go/weed/weed_server/master_server.go
@@ -77,12 +77,16 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
ms.Topo.RaftServer = raftServer.raftServer
ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
- glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
+ if ms.Topo.RaftServer.Leader() != "" {
+ glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
+ }
})
if ms.Topo.IsLeader() {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!")
} else {
- glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.")
+ if ms.Topo.RaftServer.Leader() != "" {
+ glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.")
+ }
}
}
diff --git a/go/weed/weed_server/raft_server.go b/go/weed/weed_server/raft_server.go
index 6b03ea7fb..13889df96 100644
--- a/go/weed/weed_server/raft_server.go
+++ b/go/weed/weed_server/raft_server.go
@@ -10,6 +10,7 @@ import (
"github.com/goraft/raft"
"github.com/gorilla/mux"
"io/ioutil"
+ "math/rand"
"net/http"
"net/url"
"strings"
@@ -59,13 +60,28 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin
// Join to leader if specified.
if len(s.peers) > 0 {
- glog.V(0).Infoln("Joining cluster:", strings.Join(s.peers, ","))
-
if !s.raftServer.IsLogEmpty() {
- glog.V(0).Infoln("Cannot join with an existing log")
+ glog.V(0).Infoln("Starting cluster with existing logs.")
} else {
- if err := s.Join(s.peers); err != nil {
- return nil
+ glog.V(0).Infoln("Joining cluster:", strings.Join(s.peers, ","))
+ time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
+ firstJoinError := s.Join(s.peers)
+ if firstJoinError != nil {
+ glog.V(0).Infoln("No existing server found. Starting as leader in the new cluster.")
+ _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
+ Name: s.raftServer.Name(),
+ ConnectionString: "http://" + s.httpAddr,
+ })
+ if err != nil {
+ glog.V(0).Infoln(err)
+ return nil
+ }
+ }
+ var err error
+ for err != nil {
+ glog.V(0).Infoln("waiting for peers on", strings.Join(s.peers, ","), "...")
+ time.Sleep(time.Duration(1000+rand.Intn(2000)) * time.Millisecond)
+ err = s.Join(s.peers)
}
glog.V(0).Infoln("Joined cluster")
}
diff --git a/go/weed/weed_server/raft_server_handlers.go b/go/weed/weed_server/raft_server_handlers.go
index 38943cc8d..66430fae7 100644
--- a/go/weed/weed_server/raft_server_handlers.go
+++ b/go/weed/weed_server/raft_server_handlers.go
@@ -40,10 +40,10 @@ func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter
}
func (s *RaftServer) redirectToLeader(w http.ResponseWriter, req *http.Request) {
- if s.topo.Leader() != "" {
+ if leader, e := s.topo.Leader(); e == nil {
//http.StatusMovedPermanently does not cause http POST following redirection
- glog.V(0).Infoln("Redirecting to", http.StatusMovedPermanently, "http://"+s.topo.Leader()+req.URL.Path)
- http.Redirect(w, req, "http://"+s.topo.Leader()+req.URL.Path, http.StatusMovedPermanently)
+ glog.V(0).Infoln("Redirecting to", http.StatusMovedPermanently, "http://"+leader+req.URL.Path)
+ http.Redirect(w, req, "http://"+leader+req.URL.Path, http.StatusMovedPermanently)
} else {
glog.V(0).Infoln("Error: Leader Unknown")
http.Error(w, "Leader unknown", http.StatusInternalServerError)
@@ -53,7 +53,11 @@ func (s *RaftServer) redirectToLeader(w http.ResponseWriter, req *http.Request)
func (s *RaftServer) statusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["IsLeader"] = s.topo.IsLeader()
- m["Leader"] = s.topo.Leader()
+ if leader, e := s.topo.Leader(); e == nil {
+ m["Leader"] = leader
+ } else {
+ m["Leader"] = ""
+ }
m["Peers"] = s.Peers()
writeJsonQuiet(w, r, m)
}