aboutsummaryrefslogtreecommitdiff
path: root/go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2014-03-15 23:03:49 -0700
committerChris Lu <chris.lu@gmail.com>2014-03-15 23:03:49 -0700
commit41143b3b78a1021111edc28a27f76ae2529f99a8 (patch)
tree5508f149f59363f5bdab3c16a23c02d57d4e310d /go
parentfb75fe852c5282cef436329bb0700d4d0e09f511 (diff)
downloadseaweedfs-41143b3b78a1021111edc28a27f76ae2529f99a8.tar.xz
seaweedfs-41143b3b78a1021111edc28a27f76ae2529f99a8.zip
toughen weedfs clustering, adding synchronizing max volume id among
peers in order to avoid the same volume id being assigned twice 1. moving raft.Server to topology 2. adding max volume id command for raft
Diffstat (limited to 'go')
-rw-r--r--go/topology/cluster_commands.go31
-rw-r--r--go/topology/topology.go27
-rw-r--r--go/topology/topology_event_handling.go4
-rw-r--r--go/weed/master.go2
-rw-r--r--go/weed/server.go15
-rw-r--r--go/weed/weed_server/master_server.go31
-rw-r--r--go/weed/weed_server/master_server_handlers.go36
-rw-r--r--go/weed/weed_server/raft_server.go30
-rw-r--r--go/weed/weed_server/raft_server_handlers.go10
9 files changed, 114 insertions, 72 deletions
diff --git a/go/topology/cluster_commands.go b/go/topology/cluster_commands.go
new file mode 100644
index 000000000..dc0a40c8d
--- /dev/null
+++ b/go/topology/cluster_commands.go
@@ -0,0 +1,31 @@
+package topology
+
+import (
+ "code.google.com/p/weed-fs/go/glog"
+ "code.google.com/p/weed-fs/go/storage"
+ "github.com/goraft/raft"
+)
+
+type MaxVolumeIdCommand struct {
+ MaxVolumeId storage.VolumeId `json:"maxVolumeId"`
+}
+
+func NewMaxVolumeIdCommand(value storage.VolumeId) *MaxVolumeIdCommand {
+ return &MaxVolumeIdCommand{
+ MaxVolumeId: value,
+ }
+}
+
+func (c *MaxVolumeIdCommand) CommandName() string {
+ return "MaxVolumeId"
+}
+
+func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) {
+ topo := server.Context().(*Topology)
+ before := topo.GetMaxVolumeId()
+ topo.UpAdjustMaxVolumeId(c.MaxVolumeId)
+
+ glog.V(0).Infoln("max volume id", before, "==>", topo.GetMaxVolumeId())
+
+ return nil, nil
+}
diff --git a/go/topology/topology.go b/go/topology/topology.go
index d72879035..055d273cd 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -5,6 +5,7 @@ import (
"code.google.com/p/weed-fs/go/sequence"
"code.google.com/p/weed-fs/go/storage"
"errors"
+ "github.com/goraft/raft"
"io/ioutil"
"math/rand"
)
@@ -12,8 +13,6 @@ import (
type Topology struct {
NodeImpl
- IsLeader bool
-
collectionMap map[string]*Collection
pulse int64
@@ -27,6 +26,8 @@ type Topology struct {
chanFullVolumes chan storage.VolumeInfo
configuration *Configuration
+
+ RaftServer raft.Server
}
func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) {
@@ -50,6 +51,24 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL
return t, err
}
+func (t *Topology) IsLeader() bool {
+ return t.RaftServer == nil || t.Leader() == t.RaftServer.Name()
+}
+
+func (t *Topology) Leader() string {
+ l := ""
+ if t.RaftServer != nil {
+ l = t.RaftServer.Leader()
+ }
+
+ if l == "" {
+ // We are a single node cluster, we are the leader
+ return t.RaftServer.Name()
+ }
+
+ return l
+}
+
func (t *Topology) loadConfiguration(configurationFile string) error {
b, e := ioutil.ReadFile(configurationFile)
if e == nil {
@@ -79,7 +98,9 @@ func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
func (t *Topology) NextVolumeId() storage.VolumeId {
vid := t.GetMaxVolumeId()
- return vid.Next()
+ next := vid.Next()
+ go t.RaftServer.Do(NewMaxVolumeIdCommand(next))
+ return next
}
func (t *Topology) PickForWrite(collectionName string, rp *storage.ReplicaPlacement, count int, dataCenter string) (string, int, *DataNode, error) {
diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go
index 710e7b2ae..7398ff9bf 100644
--- a/go/topology/topology_event_handling.go
+++ b/go/topology/topology_event_handling.go
@@ -10,7 +10,7 @@ import (
func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
go func() {
for {
- if t.IsLeader {
+ if t.IsLeader() {
freshThreshHold := time.Now().Unix() - 3*t.pulse //3 times of sleep interval
t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit)
}
@@ -19,7 +19,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
}()
go func(garbageThreshold string) {
c := time.Tick(15 * time.Minute)
- if t.IsLeader {
+ if t.IsLeader() {
for _ = range c {
t.Vacuum(garbageThreshold)
}
diff --git a/go/weed/master.go b/go/weed/master.go
index 430d99ad9..2f0a58038 100644
--- a/go/weed/master.go
+++ b/go/weed/master.go
@@ -74,7 +74,7 @@ func runMaster(cmd *Command, args []string) bool {
if *masterPeers != "" {
peers = strings.Split(*masterPeers, ",")
}
- raftServer := weed_server.NewRaftServer(r, VERSION, peers, *masterIp+":"+strconv.Itoa(*mport), *metaFolder)
+ raftServer := weed_server.NewRaftServer(r, VERSION, peers, *masterIp+":"+strconv.Itoa(*mport), *metaFolder, ms.Topo, *mpulse)
ms.SetRaftServer(raftServer)
}()
diff --git a/go/weed/server.go b/go/weed/server.go
index 8b67275e5..6047030df 100644
--- a/go/weed/server.go
+++ b/go/weed/server.go
@@ -10,6 +10,7 @@ import (
"runtime"
"strconv"
"strings"
+ "sync"
"time"
)
@@ -51,7 +52,7 @@ var (
volumePublicUrl = cmdServer.Flag.String("publicUrl", "", "Publicly accessible <ip|server_name>:<port>")
volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
volumeMaxDataVolumeCounts = cmdServer.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...")
- volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than the master's setting")
+ volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
serverWhiteList []string
)
@@ -95,6 +96,12 @@ func runServer(cmd *Command, args []string) bool {
serverWhiteList = strings.Split(*serverWhiteListOption, ",")
}
+ var raftWaitForMaster sync.WaitGroup
+ var volumeWait sync.WaitGroup
+
+ raftWaitForMaster.Add(1)
+ volumeWait.Add(1)
+
go func() {
r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, VERSION, *masterPort, *masterMetaFolder,
@@ -109,21 +116,25 @@ func runServer(cmd *Command, args []string) bool {
}
go func() {
+ raftWaitForMaster.Wait()
time.Sleep(100 * time.Millisecond)
var peers []string
if *serverPeers != "" {
peers = strings.Split(*serverPeers, ",")
}
- raftServer := weed_server.NewRaftServer(r, VERSION, peers, *serverIp+":"+strconv.Itoa(*masterPort), *masterMetaFolder)
+ raftServer := weed_server.NewRaftServer(r, VERSION, peers, *serverIp+":"+strconv.Itoa(*masterPort), *masterMetaFolder, ms.Topo, *volumePulse)
ms.SetRaftServer(raftServer)
+ volumeWait.Done()
}()
+ raftWaitForMaster.Done()
e := masterServer.ListenAndServe()
if e != nil {
glog.Fatalf("Fail to start master:%s", e)
}
}()
+ volumeWait.Wait()
time.Sleep(100 * time.Millisecond)
r := http.NewServeMux()
weed_server.NewVolumeServer(r, VERSION, *serverIp, *volumePort, *volumePublicUrl, folders, maxCounts,
diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go
index 606884283..ac28e678d 100644
--- a/go/weed/weed_server/master_server.go
+++ b/go/weed/weed_server/master_server.go
@@ -25,11 +25,10 @@ type MasterServer struct {
whiteList []string
version string
- topo *topology.Topology
+ Topo *topology.Topology
vg *replication.VolumeGrowth
vgLock sync.Mutex
- raftServer *RaftServer
bounedLeaderChan chan int
}
@@ -52,7 +51,7 @@ func NewMasterServer(r *mux.Router, version string, port int, metaFolder string,
ms.bounedLeaderChan = make(chan int, 16)
seq := sequence.NewFileSequencer(path.Join(metaFolder, "weed.seq"))
var e error
- if ms.topo, e = topology.NewTopology("topo", confFile, seq,
+ if ms.Topo, e = topology.NewTopology("topo", confFile, seq,
uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds); e != nil {
glog.Fatalf("cannot create topology:%s", e)
}
@@ -70,42 +69,36 @@ func NewMasterServer(r *mux.Router, version string, port int, metaFolder string,
r.HandleFunc("/submit", secure(ms.whiteList, ms.submitFromMasterServerHandler))
r.HandleFunc("/{filekey}", ms.redirectHandler)
- ms.topo.StartRefreshWritableVolumes(garbageThreshold)
+ ms.Topo.StartRefreshWritableVolumes(garbageThreshold)
return ms
}
func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
- ms.raftServer = raftServer
- ms.raftServer.raftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
- ms.topo.IsLeader = ms.IsLeader()
- glog.V(0).Infoln("[", ms.raftServer.Name(), "]", ms.raftServer.Leader(), "becomes leader.")
+ ms.Topo.RaftServer = raftServer.raftServer
+ ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
+ glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
})
- ms.topo.IsLeader = ms.IsLeader()
- if ms.topo.IsLeader {
- glog.V(0).Infoln("[", ms.raftServer.Name(), "]", "I am the leader!")
+ if ms.Topo.IsLeader() {
+ glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!")
} else {
- glog.V(0).Infoln("[", ms.raftServer.Name(), "]", ms.raftServer.Leader(), "is the leader.")
+ glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.")
}
}
-func (ms *MasterServer) IsLeader() bool {
- return ms.raftServer == nil || ms.raftServer.IsLeader()
-}
-
func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
- if ms.IsLeader() {
+ if ms.Topo.IsLeader() {
f(w, r)
} else {
ms.bounedLeaderChan <- 1
defer func() { <-ms.bounedLeaderChan }()
- targetUrl, err := url.Parse("http://" + ms.raftServer.Leader())
+ targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader())
if err != nil {
writeJsonQuiet(w, r, map[string]interface{}{"error": "Leader URL Parse Error " + err.Error()})
return
}
- glog.V(4).Infoln("proxying to leader", ms.raftServer.Leader())
+ glog.V(4).Infoln("proxying to leader", ms.Topo.RaftServer.Leader())
proxy := httputil.NewSingleHostReverseProxy(targetUrl)
proxy.Transport = util.Transport
proxy.ServeHTTP(w, r)
diff --git a/go/weed/weed_server/master_server_handlers.go b/go/weed/weed_server/master_server_handlers.go
index e5c383747..1ea837a2c 100644
--- a/go/weed/weed_server/master_server_handlers.go
+++ b/go/weed/weed_server/master_server_handlers.go
@@ -19,7 +19,7 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request)
}
volumeId, err := storage.NewVolumeId(vid)
if err == nil {
- machines := ms.topo.Lookup(collection, volumeId)
+ machines := ms.Topo.Lookup(collection, volumeId)
if machines != nil {
ret := []map[string]string{}
for _, dn := range machines {
@@ -54,23 +54,23 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
return
}
- if ms.topo.GetVolumeLayout(collection, replicaPlacement).GetActiveVolumeCount(dataCenter) <= 0 {
- if ms.topo.FreeSpace() <= 0 {
+ if ms.Topo.GetVolumeLayout(collection, replicaPlacement).GetActiveVolumeCount(dataCenter) <= 0 {
+ if ms.Topo.FreeSpace() <= 0 {
w.WriteHeader(http.StatusNotFound)
writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"})
return
} else {
ms.vgLock.Lock()
defer ms.vgLock.Unlock()
- if ms.topo.GetVolumeLayout(collection, replicaPlacement).GetActiveVolumeCount(dataCenter) <= 0 {
- if _, err = ms.vg.AutomaticGrowByType(collection, replicaPlacement, dataCenter, ms.topo); err != nil {
+ if ms.Topo.GetVolumeLayout(collection, replicaPlacement).GetActiveVolumeCount(dataCenter) <= 0 {
+ if _, err = ms.vg.AutomaticGrowByType(collection, replicaPlacement, dataCenter, ms.Topo); err != nil {
writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()})
return
}
}
}
}
- fid, count, dn, err := ms.topo.PickForWrite(collection, replicaPlacement, c, dataCenter)
+ fid, count, dn, err := ms.Topo.PickForWrite(collection, replicaPlacement, c, dataCenter)
if err == nil {
writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count})
} else {
@@ -80,7 +80,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
}
func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) {
- collection, ok := ms.topo.GetCollection(r.FormValue("collection"))
+ collection, ok := ms.Topo.GetCollection(r.FormValue("collection"))
if !ok {
writeJsonQuiet(w, r, map[string]interface{}{"error": "collection " + r.FormValue("collection") + "does not exist!"})
return
@@ -92,7 +92,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
return
}
}
- ms.topo.DeleteCollection(r.FormValue("collection"))
+ ms.Topo.DeleteCollection(r.FormValue("collection"))
}
func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
@@ -111,7 +111,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
return
}
debug(s, "volumes", r.FormValue("volumes"))
- ms.topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, r.FormValue("dataCenter"), r.FormValue("rack"))
+ ms.Topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, r.FormValue("dataCenter"), r.FormValue("rack"))
m := make(map[string]interface{})
m["VolumeSizeLimit"] = uint64(ms.volumeSizeLimitMB) * 1024 * 1024
writeJsonQuiet(w, r, m)
@@ -120,7 +120,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["Version"] = ms.version
- m["Topology"] = ms.topo.ToMap()
+ m["Topology"] = ms.Topo.ToMap()
writeJsonQuiet(w, r, m)
}
@@ -130,7 +130,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
gcThreshold = ms.garbageThreshold
}
debug("garbageThreshold =", gcThreshold)
- ms.topo.Vacuum(gcThreshold)
+ ms.Topo.Vacuum(gcThreshold)
ms.dirStatusHandler(w, r)
}
@@ -139,10 +139,10 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
replicaPlacement, err := storage.NewReplicaPlacementFromString(r.FormValue("replication"))
if err == nil {
if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
- if ms.topo.FreeSpace() < count*replicaPlacement.GetCopyCount() {
- err = errors.New("Only " + strconv.Itoa(ms.topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*replicaPlacement.GetCopyCount()))
+ if ms.Topo.FreeSpace() < count*replicaPlacement.GetCopyCount() {
+ err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*replicaPlacement.GetCopyCount()))
} else {
- count, err = ms.vg.GrowByCountAndType(count, r.FormValue("collection"), replicaPlacement, r.FormValue("dataCenter"), ms.topo)
+ count, err = ms.vg.GrowByCountAndType(count, r.FormValue("collection"), replicaPlacement, r.FormValue("dataCenter"), ms.Topo)
}
} else {
err = errors.New("parameter count is not found")
@@ -160,7 +160,7 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["Version"] = ms.version
- m["Volumes"] = ms.topo.ToVolumeMap()
+ m["Volumes"] = ms.Topo.ToVolumeMap()
writeJsonQuiet(w, r, m)
}
@@ -171,7 +171,7 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request)
debug("parsing error:", err, r.URL.Path)
return
}
- machines := ms.topo.Lookup("", volumeId)
+ machines := ms.Topo.Lookup("", volumeId)
if machines != nil && len(machines) > 0 {
http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)
} else {
@@ -181,9 +181,9 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request)
}
func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
- if ms.IsLeader() {
+ if ms.Topo.IsLeader() {
submitForClientHandler(w, r, "localhost:"+strconv.Itoa(ms.port))
} else {
- submitForClientHandler(w, r, ms.raftServer.Leader())
+ submitForClientHandler(w, r, ms.Topo.RaftServer.Leader())
}
}
diff --git a/go/weed/weed_server/raft_server.go b/go/weed/weed_server/raft_server.go
index 14cd1461b..882bb23f9 100644
--- a/go/weed/weed_server/raft_server.go
+++ b/go/weed/weed_server/raft_server.go
@@ -3,6 +3,7 @@ package weed_server
import (
"bytes"
"code.google.com/p/weed-fs/go/glog"
+ "code.google.com/p/weed-fs/go/topology"
"encoding/json"
"errors"
"fmt"
@@ -22,31 +23,35 @@ type RaftServer struct {
httpAddr string
version string
router *mux.Router
+ topo *topology.Topology
}
-func NewRaftServer(r *mux.Router, version string, peers []string, httpAddr string, dataDir string) *RaftServer {
+func NewRaftServer(r *mux.Router, version string, peers []string, httpAddr string, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer {
s := &RaftServer{
version: version,
peers: peers,
httpAddr: httpAddr,
dataDir: dataDir,
router: r,
+ topo: topo,
}
if glog.V(4) {
raft.SetLogLevel(2)
}
+ raft.RegisterCommand(&topology.MaxVolumeIdCommand{})
+
var err error
transporter := raft.NewHTTPTransporter("/cluster")
- s.raftServer, err = raft.NewServer(s.httpAddr, s.dataDir, transporter, nil, nil, "")
+ s.raftServer, err = raft.NewServer(s.httpAddr, s.dataDir, transporter, nil, topo, "")
if err != nil {
glog.V(0).Infoln(err)
return nil
}
transporter.Install(s.raftServer, s)
s.raftServer.SetHeartbeatInterval(1 * time.Second)
- s.raftServer.SetElectionTimeout(1500 * time.Millisecond)
+ s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 1150 * time.Millisecond)
s.raftServer.Start()
s.router.HandleFunc("/cluster/join", s.joinHandler).Methods("POST")
@@ -86,25 +91,6 @@ func NewRaftServer(r *mux.Router, version string, peers []string, httpAddr strin
return s
}
-func (s *RaftServer) Name() string {
- return s.raftServer.Name()
-}
-
-func (s *RaftServer) IsLeader() bool {
- return s.Leader() == s.raftServer.Name()
-}
-
-func (s *RaftServer) Leader() string {
- l := s.raftServer.Leader()
-
- if l == "" {
- // We are a single node cluster, we are the leader
- return s.raftServer.Name()
- }
-
- return l
-}
-
func (s *RaftServer) Peers() (members []string) {
peers := s.raftServer.Peers()
diff --git a/go/weed/weed_server/raft_server_handlers.go b/go/weed/weed_server/raft_server_handlers.go
index 4de52bf0a..38943cc8d 100644
--- a/go/weed/weed_server/raft_server_handlers.go
+++ b/go/weed/weed_server/raft_server_handlers.go
@@ -40,10 +40,10 @@ func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter
}
func (s *RaftServer) redirectToLeader(w http.ResponseWriter, req *http.Request) {
- if s.Leader() != "" {
+ if s.topo.Leader() != "" {
//http.StatusMovedPermanently does not cause http POST following redirection
- glog.V(0).Infoln("Redirecting to", http.StatusMovedPermanently, "http://"+s.Leader()+req.URL.Path)
- http.Redirect(w, req, "http://"+s.Leader()+req.URL.Path, http.StatusMovedPermanently)
+ glog.V(0).Infoln("Redirecting to", http.StatusMovedPermanently, "http://"+s.topo.Leader()+req.URL.Path)
+ http.Redirect(w, req, "http://"+s.topo.Leader()+req.URL.Path, http.StatusMovedPermanently)
} else {
glog.V(0).Infoln("Error: Leader Unknown")
http.Error(w, "Leader unknown", http.StatusInternalServerError)
@@ -52,8 +52,8 @@ func (s *RaftServer) redirectToLeader(w http.ResponseWriter, req *http.Request)
func (s *RaftServer) statusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
- m["IsLeader"] = s.IsLeader()
- m["Leader"] = s.Leader()
+ m["IsLeader"] = s.topo.IsLeader()
+ m["Leader"] = s.topo.Leader()
m["Peers"] = s.Peers()
writeJsonQuiet(w, r, m)
}