aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go/topology/cluster_commands.go31
-rw-r--r--go/topology/topology.go27
-rw-r--r--go/topology/topology_event_handling.go4
-rw-r--r--go/weed/master.go2
-rw-r--r--go/weed/server.go15
-rw-r--r--go/weed/weed_server/master_server.go31
-rw-r--r--go/weed/weed_server/master_server_handlers.go36
-rw-r--r--go/weed/weed_server/raft_server.go30
-rw-r--r--go/weed/weed_server/raft_server_handlers.go10
9 files changed, 114 insertions, 72 deletions
diff --git a/go/topology/cluster_commands.go b/go/topology/cluster_commands.go
new file mode 100644
index 000000000..dc0a40c8d
--- /dev/null
+++ b/go/topology/cluster_commands.go
@@ -0,0 +1,31 @@
+package topology
+
+import (
+ "code.google.com/p/weed-fs/go/glog"
+ "code.google.com/p/weed-fs/go/storage"
+ "github.com/goraft/raft"
+)
+
+type MaxVolumeIdCommand struct {
+ MaxVolumeId storage.VolumeId `json:"maxVolumeId"`
+}
+
+func NewMaxVolumeIdCommand(value storage.VolumeId) *MaxVolumeIdCommand {
+ return &MaxVolumeIdCommand{
+ MaxVolumeId: value,
+ }
+}
+
+func (c *MaxVolumeIdCommand) CommandName() string {
+ return "MaxVolumeId"
+}
+
+func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) {
+ topo := server.Context().(*Topology)
+ before := topo.GetMaxVolumeId()
+ topo.UpAdjustMaxVolumeId(c.MaxVolumeId)
+
+ glog.V(0).Infoln("max volume id", before, "==>", topo.GetMaxVolumeId())
+
+ return nil, nil
+}
diff --git a/go/topology/topology.go b/go/topology/topology.go
index d72879035..055d273cd 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -5,6 +5,7 @@ import (
"code.google.com/p/weed-fs/go/sequence"
"code.google.com/p/weed-fs/go/storage"
"errors"
+ "github.com/goraft/raft"
"io/ioutil"
"math/rand"
)
@@ -12,8 +13,6 @@ import (
type Topology struct {
NodeImpl
- IsLeader bool
-
collectionMap map[string]*Collection
pulse int64
@@ -27,6 +26,8 @@ type Topology struct {
chanFullVolumes chan storage.VolumeInfo
configuration *Configuration
+
+ RaftServer raft.Server
}
func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) {
@@ -50,6 +51,24 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL
return t, err
}
+func (t *Topology) IsLeader() bool {
+ return t.RaftServer == nil || t.Leader() == t.RaftServer.Name()
+}
+
+func (t *Topology) Leader() string {
+ l := ""
+ if t.RaftServer != nil {
+ l = t.RaftServer.Leader()
+ }
+
+ if l == "" {
+ // We are a single node cluster, we are the leader
+ return t.RaftServer.Name()
+ }
+
+ return l
+}
+
func (t *Topology) loadConfiguration(configurationFile string) error {
b, e := ioutil.ReadFile(configurationFile)
if e == nil {
@@ -79,7 +98,9 @@ func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
func (t *Topology) NextVolumeId() storage.VolumeId {
vid := t.GetMaxVolumeId()
- return vid.Next()
+ next := vid.Next()
+ go t.RaftServer.Do(NewMaxVolumeIdCommand(next))
+ return next
}
func (t *Topology) PickForWrite(collectionName string, rp *storage.ReplicaPlacement, count int, dataCenter string) (string, int, *DataNode, error) {
diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go
index 710e7b2ae..7398ff9bf 100644
--- a/go/topology/topology_event_handling.go
+++ b/go/topology/topology_event_handling.go
@@ -10,7 +10,7 @@ import (
func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
go func() {
for {
- if t.IsLeader {
+ if t.IsLeader() {
freshThreshHold := time.Now().Unix() - 3*t.pulse //3 times of sleep interval
t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit)
}
@@ -19,7 +19,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
}()
go func(garbageThreshold string) {
c := time.Tick(15 * time.Minute)
- if t.IsLeader {
+ if t.IsLeader() {
for _ = range c {
t.Vacuum(garbageThreshold)
}
diff --git a/go/weed/master.go b/go/weed/master.go
index 430d99ad9..2f0a58038 100644
--- a/go/weed/master.go
+++ b/go/weed/master.go
@@ -74,7 +74,7 @@ func runMaster(cmd *Command, args []string) bool {
if *masterPeers != "" {
peers = strings.Split(*masterPeers, ",")
}
- raftServer := weed_server.NewRaftServer(r, VERSION, peers, *masterIp+":"+strconv.Itoa(*mport), *metaFolder)
+ raftServer := weed_server.NewRaftServer(r, VERSION, peers, *masterIp+":"+strconv.Itoa(*mport), *metaFolder, ms.Topo, *mpulse)
ms.SetRaftServer(raftServer)
}()
diff --git a/go/weed/server.go b/go/weed/server.go
index 8b67275e5..6047030df 100644
--- a/go/weed/server.go
+++ b/go/weed/server.go
@@ -10,6 +10,7 @@ import (
"runtime"
"strconv"
"strings"
+ "sync"
"time"
)
@@ -51,7 +52,7 @@ var (
volumePublicUrl = cmdServer.Flag.String("publicUrl", "", "Publicly accessible <ip|server_name>:<port>")
volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
volumeMaxDataVolumeCounts = cmdServer.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...")
- volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than the master's setting")
+ volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
serverWhiteList []string
)
@@ -95,6 +96,12 @@ func runServer(cmd *Command, args []string) bool {
serverWhiteList = strings.Split(*serverWhiteListOption, ",")
}
+ var raftWaitForMaster sync.WaitGroup
+ var volumeWait sync.WaitGroup
+
+ raftWaitForMaster.Add(1)
+ volumeWait.Add(1)
+
go func() {
r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, VERSION, *masterPort, *masterMetaFolder,
@@ -109,21 +116,25 @@ func runServer(cmd *Command, args []string) bool {
}
go func() {
+ raftWaitForMaster.Wait()
time.Sleep(100 * time.Millisecond)
var peers []string
if *serverPeers != "" {
peers = strings.Split(*serverPeers, ",")
}
- raftServer := weed_server.NewRaftServer(r, VERSION, peers, *serverIp+":"+strconv.Itoa(*masterPort), *masterMetaFolder)
+ raftServer := weed_server.NewRaftServer(r, VERSION, peers, *serverIp+":"+strconv.Itoa(*masterPort), *masterMetaFolder, ms.Topo, *volumePulse)
ms.SetRaftServer(raftServer)
+ volumeWait.Done()
}()
+ raftWaitForMaster.Done()
e := masterServer.ListenAndServe()
if e != nil {
glog.Fatalf("Fail to start master:%s", e)
}
}()
+ volumeWait.Wait()
time.Sleep(100 * time.Millisecond)
r := http.NewServeMux()
weed_server.NewVolumeServer(r, VERSION, *serverIp, *volumePort, *volumePublicUrl, folders, maxCounts,
diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go
index 606884283..ac28e678d 100644
--- a/go/weed/weed_server/master_server.go
+++ b/go/weed/weed_server/master_server.go
@@ -25,11 +25,10 @@ type MasterServer struct {
whiteList []string
version string
- topo *topology.Topology
+ Topo *topology.Topology
vg *replication.VolumeGrowth
vgLock sync.Mutex
- raftServer *RaftServer
bounedLeaderChan chan int
}
@@ -52,7 +51,7 @@ func NewMasterServer(r *mux.Router, version string, port int, metaFolder string,
ms.bounedLeaderChan = make(chan int, 16)
seq := sequence.NewFileSequencer(path.Join(metaFolder, "weed.seq"))
var e error
- if ms.topo, e = topology.NewTopology("topo", confFile, seq,
+ if ms.Topo, e = topology.NewTopology("topo", confFile, seq,
uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds); e != nil {
glog.Fatalf("cannot create topology:%s", e)
}
@@ -70,42 +69,36 @@ func NewMasterServer(r *mux.Router, version string, port int, metaFolder string,
r.HandleFunc("/submit", secure(ms.whiteList, ms.submitFromMasterServerHandler))
r.HandleFunc("/{filekey}", ms.redirectHandler)
- ms.topo.StartRefreshWritableVolumes(garbageThreshold)
+ ms.Topo.StartRefreshWritableVolumes(garbageThreshold)
return ms
}
func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
- ms.raftServer = raftServer
- ms.raftServer.raftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
- ms.topo.IsLeader = ms.IsLeader()
- glog.V(0).Infoln("[", ms.raftServer.Name(), "]", ms.raftServer.Leader(), "becomes leader.")
+ ms.Topo.RaftServer = raftServer.raftServer
+ ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
+ glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
})
- ms.topo.IsLeader = ms.IsLeader()
- if ms.topo.IsLeader {
- glog.V(0).Infoln("[", ms.raftServer.Name(), "]", "I am the leader!")
+ if ms.Topo.IsLeader() {
+ glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!")
} else {
- glog.V(0).Infoln("[", ms.raftServer.Name(), "]", ms.raftServer.Leader(), "is the leader.")
+ glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.")
}
}
-func (ms *MasterServer) IsLeader() bool {
- return ms.raftServer == nil || ms.raftServer.IsLeader()
-}
-
func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
- if ms.IsLeader() {
+ if ms.Topo.IsLeader() {
f(w, r)
} else {
ms.bounedLeaderChan <- 1
defer func() { <-ms.bounedLeaderChan }()
- targetUrl, err := url.Parse("http://" + ms.raftServer.Leader())
+ targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader())
if err != nil {
writeJsonQuiet(w, r, map[string]interface{}{"error": "Leader URL Parse Error " + err.Error()})
return
}
- glog.V(4).Infoln("proxying to leader", ms.raftServer.Leader())
+ glog.V(4).Infoln("proxying to leader", ms.Topo.RaftServer.Leader())
proxy := httputil.NewSingleHostReverseProxy(targetUrl)
proxy.Transport = util.Transport
proxy.ServeHTTP(w, r)
diff --git a/go/weed/weed_server/master_server_handlers.go b/go/weed/weed_server/master_server_handlers.go
index e5c383747..1ea837a2c 100644
--- a/go/weed/weed_server/master_server_handlers.go
+++ b/go/weed/weed_server/master_server_handlers.go
@@ -19,7 +19,7 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request)
}
volumeId, err := storage.NewVolumeId(vid)
if err == nil {
- machines := ms.topo.Lookup(collection, volumeId)
+ machines := ms.Topo.Lookup(collection, volumeId)
if machines != nil {
ret := []map[string]string{}
for _, dn := range machines {
@@ -54,23 +54,23 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
return
}
- if ms.topo.GetVolumeLayout(collection, replicaPlacement).GetActiveVolumeCount(dataCenter) <= 0 {
- if ms.topo.FreeSpace() <= 0 {
+ if ms.Topo.GetVolumeLayout(collection, replicaPlacement).GetActiveVolumeCount(dataCenter) <= 0 {
+ if ms.Topo.FreeSpace() <= 0 {
w.WriteHeader(http.StatusNotFound)
writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"})
return
} else {
ms.vgLock.Lock()
defer ms.vgLock.Unlock()
- if ms.topo.GetVolumeLayout(collection, replicaPlacement).GetActiveVolumeCount(dataCenter) <= 0 {
- if _, err = ms.vg.AutomaticGrowByType(collection, replicaPlacement, dataCenter, ms.topo); err != nil {
+ if ms.Topo.GetVolumeLayout(collection, replicaPlacement).GetActiveVolumeCount(dataCenter) <= 0 {
+ if _, err = ms.vg.AutomaticGrowByType(collection, replicaPlacement, dataCenter, ms.Topo); err != nil {
writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()})
return
}
}
}
}
- fid, count, dn, err := ms.topo.PickForWrite(collection, replicaPlacement, c, dataCenter)
+ fid, count, dn, err := ms.Topo.PickForWrite(collection, replicaPlacement, c, dataCenter)
if err == nil {
writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count})
} else {
@@ -80,7 +80,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
}
func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) {
- collection, ok := ms.topo.GetCollection(r.FormValue("collection"))
+ collection, ok := ms.Topo.GetCollection(r.FormValue("collection"))
if !ok {
writeJsonQuiet(w, r, map[string]interface{}{"error": "collection " + r.FormValue("collection") + "does not exist!"})
return
@@ -92,7 +92,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
return
}
}
- ms.topo.DeleteCollection(r.FormValue("collection"))
+ ms.Topo.DeleteCollection(r.FormValue("collection"))
}
func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
@@ -111,7 +111,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
return
}
debug(s, "volumes", r.FormValue("volumes"))
- ms.topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, r.FormValue("dataCenter"), r.FormValue("rack"))
+ ms.Topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, r.FormValue("dataCenter"), r.FormValue("rack"))
m := make(map[string]interface{})
m["VolumeSizeLimit"] = uint64(ms.volumeSizeLimitMB) * 1024 * 1024
writeJsonQuiet(w, r, m)
@@ -120,7 +120,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["Version"] = ms.version
- m["Topology"] = ms.topo.ToMap()
+ m["Topology"] = ms.Topo.ToMap()
writeJsonQuiet(w, r, m)
}
@@ -130,7 +130,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
gcThreshold = ms.garbageThreshold
}
debug("garbageThreshold =", gcThreshold)
- ms.topo.Vacuum(gcThreshold)
+ ms.Topo.Vacuum(gcThreshold)
ms.dirStatusHandler(w, r)
}
@@ -139,10 +139,10 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
replicaPlacement, err := storage.NewReplicaPlacementFromString(r.FormValue("replication"))
if err == nil {
if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
- if ms.topo.FreeSpace() < count*replicaPlacement.GetCopyCount() {
- err = errors.New("Only " + strconv.Itoa(ms.topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*replicaPlacement.GetCopyCount()))
+ if ms.Topo.FreeSpace() < count*replicaPlacement.GetCopyCount() {
+ err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*replicaPlacement.GetCopyCount()))
} else {
- count, err = ms.vg.GrowByCountAndType(count, r.FormValue("collection"), replicaPlacement, r.FormValue("dataCenter"), ms.topo)
+ count, err = ms.vg.GrowByCountAndType(count, r.FormValue("collection"), replicaPlacement, r.FormValue("dataCenter"), ms.Topo)
}
} else {
err = errors.New("parameter count is not found")
@@ -160,7 +160,7 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["Version"] = ms.version
- m["Volumes"] = ms.topo.ToVolumeMap()
+ m["Volumes"] = ms.Topo.ToVolumeMap()
writeJsonQuiet(w, r, m)
}
@@ -171,7 +171,7 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request)
debug("parsing error:", err, r.URL.Path)
return
}
- machines := ms.topo.Lookup("", volumeId)
+ machines := ms.Topo.Lookup("", volumeId)
if machines != nil && len(machines) > 0 {
http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)
} else {
@@ -181,9 +181,9 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request)
}
func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
- if ms.IsLeader() {
+ if ms.Topo.IsLeader() {
submitForClientHandler(w, r, "localhost:"+strconv.Itoa(ms.port))
} else {
- submitForClientHandler(w, r, ms.raftServer.Leader())
+ submitForClientHandler(w, r, ms.Topo.RaftServer.Leader())
}
}
diff --git a/go/weed/weed_server/raft_server.go b/go/weed/weed_server/raft_server.go
index 14cd1461b..882bb23f9 100644
--- a/go/weed/weed_server/raft_server.go
+++ b/go/weed/weed_server/raft_server.go
@@ -3,6 +3,7 @@ package weed_server
import (
"bytes"
"code.google.com/p/weed-fs/go/glog"
+ "code.google.com/p/weed-fs/go/topology"
"encoding/json"
"errors"
"fmt"
@@ -22,31 +23,35 @@ type RaftServer struct {
httpAddr string
version string
router *mux.Router
+ topo *topology.Topology
}
-func NewRaftServer(r *mux.Router, version string, peers []string, httpAddr string, dataDir string) *RaftServer {
+func NewRaftServer(r *mux.Router, version string, peers []string, httpAddr string, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer {
s := &RaftServer{
version: version,
peers: peers,
httpAddr: httpAddr,
dataDir: dataDir,
router: r,
+ topo: topo,
}
if glog.V(4) {
raft.SetLogLevel(2)
}
+ raft.RegisterCommand(&topology.MaxVolumeIdCommand{})
+
var err error
transporter := raft.NewHTTPTransporter("/cluster")
- s.raftServer, err = raft.NewServer(s.httpAddr, s.dataDir, transporter, nil, nil, "")
+ s.raftServer, err = raft.NewServer(s.httpAddr, s.dataDir, transporter, nil, topo, "")
if err != nil {
glog.V(0).Infoln(err)
return nil
}
transporter.Install(s.raftServer, s)
s.raftServer.SetHeartbeatInterval(1 * time.Second)
- s.raftServer.SetElectionTimeout(1500 * time.Millisecond)
+ s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 1150 * time.Millisecond)
s.raftServer.Start()
s.router.HandleFunc("/cluster/join", s.joinHandler).Methods("POST")
@@ -86,25 +91,6 @@ func NewRaftServer(r *mux.Router, version string, peers []string, httpAddr strin
return s
}
-func (s *RaftServer) Name() string {
- return s.raftServer.Name()
-}
-
-func (s *RaftServer) IsLeader() bool {
- return s.Leader() == s.raftServer.Name()
-}
-
-func (s *RaftServer) Leader() string {
- l := s.raftServer.Leader()
-
- if l == "" {
- // We are a single node cluster, we are the leader
- return s.raftServer.Name()
- }
-
- return l
-}
-
func (s *RaftServer) Peers() (members []string) {
peers := s.raftServer.Peers()
diff --git a/go/weed/weed_server/raft_server_handlers.go b/go/weed/weed_server/raft_server_handlers.go
index 4de52bf0a..38943cc8d 100644
--- a/go/weed/weed_server/raft_server_handlers.go
+++ b/go/weed/weed_server/raft_server_handlers.go
@@ -40,10 +40,10 @@ func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter
}
func (s *RaftServer) redirectToLeader(w http.ResponseWriter, req *http.Request) {
- if s.Leader() != "" {
+ if s.topo.Leader() != "" {
//http.StatusMovedPermanently does not cause http POST following redirection
- glog.V(0).Infoln("Redirecting to", http.StatusMovedPermanently, "http://"+s.Leader()+req.URL.Path)
- http.Redirect(w, req, "http://"+s.Leader()+req.URL.Path, http.StatusMovedPermanently)
+ glog.V(0).Infoln("Redirecting to", http.StatusMovedPermanently, "http://"+s.topo.Leader()+req.URL.Path)
+ http.Redirect(w, req, "http://"+s.topo.Leader()+req.URL.Path, http.StatusMovedPermanently)
} else {
glog.V(0).Infoln("Error: Leader Unknown")
http.Error(w, "Leader unknown", http.StatusInternalServerError)
@@ -52,8 +52,8 @@ func (s *RaftServer) redirectToLeader(w http.ResponseWriter, req *http.Request)
func (s *RaftServer) statusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
- m["IsLeader"] = s.IsLeader()
- m["Leader"] = s.Leader()
+ m["IsLeader"] = s.topo.IsLeader()
+ m["Leader"] = s.topo.Leader()
m["Peers"] = s.Peers()
writeJsonQuiet(w, r, m)
}