aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2014-02-14 17:10:49 -0800
committerChris Lu <chris.lu@gmail.com>2014-02-14 17:10:49 -0800
commitedae676913363bdd1e5a50bf0778fdcc3c6d6051 (patch)
tree8711d0e0382ba15b2810e7783cdede4283d7e1c4
parentef4c2c0d1e5bb45a63cde703013871daa401d1ef (diff)
downloadseaweedfs-edae676913363bdd1e5a50bf0778fdcc3c6d6051.tar.xz
seaweedfs-edae676913363bdd1e5a50bf0778fdcc3c6d6051.zip
1. volume server auto detect clustered master nodes
2. remove operation package dependency on storage
-rw-r--r--go/operation/list_masters.go27
-rw-r--r--go/operation/lookup_volume_id.go14
-rw-r--r--go/replication/allocate_volume.go (renamed from go/operation/allocate_volume.go)2
-rw-r--r--go/replication/store_replicate.go2
-rw-r--r--go/replication/volume_growth.go3
-rw-r--r--go/storage/store.go56
-rw-r--r--go/util/http_util.go (renamed from go/util/post.go)15
-rw-r--r--go/weed/master.go21
-rw-r--r--go/weed/weed_server/volume_server.go8
-rw-r--r--go/weed/weed_server/volume_server_handlers.go2
10 files changed, 127 insertions, 23 deletions
diff --git a/go/operation/list_masters.go b/go/operation/list_masters.go
new file mode 100644
index 000000000..05235aed0
--- /dev/null
+++ b/go/operation/list_masters.go
@@ -0,0 +1,27 @@
+package operation
+
+import (
+ "code.google.com/p/weed-fs/go/glog"
+ "code.google.com/p/weed-fs/go/util"
+ "encoding/json"
+)
+
+type ClusterStatusResult struct {
+ IsLeader bool
+ Leader string
+ Peers []string
+}
+
+func ListMasters(server string) ([]string, error) {
+ jsonBlob, err := util.Get("http://" + server + "/cluster/status")
+ glog.V(2).Info("list masters result :", string(jsonBlob))
+ if err != nil {
+ return nil, err
+ }
+ var ret ClusterStatusResult
+ err = json.Unmarshal(jsonBlob, &ret)
+ if err != nil {
+ return nil, err
+ }
+ return ret.Peers, nil
+}
diff --git a/go/operation/lookup_volume_id.go b/go/operation/lookup_volume_id.go
index cd1d3b1bd..6e6035fae 100644
--- a/go/operation/lookup_volume_id.go
+++ b/go/operation/lookup_volume_id.go
@@ -1,12 +1,12 @@
package operation
import (
- "code.google.com/p/weed-fs/go/storage"
"code.google.com/p/weed-fs/go/util"
"encoding/json"
"errors"
_ "fmt"
"net/url"
+ "strings"
)
type Location struct {
@@ -18,9 +18,9 @@ type LookupResult struct {
Error string `json:"error"`
}
-func Lookup(server string, vid storage.VolumeId) (*LookupResult, error) {
+func Lookup(server string, vid string) (*LookupResult, error) {
values := make(url.Values)
- values.Add("volumeId", vid.String())
+ values.Add("volumeId", vid)
jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values)
if err != nil {
return nil, err
@@ -37,11 +37,11 @@ func Lookup(server string, vid storage.VolumeId) (*LookupResult, error) {
}
func LookupFileId(server string, fileId string) (fullUrl string, err error) {
- fid, parseErr := storage.ParseFileId(fileId)
- if parseErr != nil {
- return "", parseErr
+ a := strings.Split(fileId, ",")
+ if len(a) != 2 {
+ return "", errors.New("Invalid fileId " + fileId)
}
- lookup, lookupError := Lookup(server, fid.VolumeId)
+ lookup, lookupError := Lookup(server, a[0])
if lookupError != nil {
return "", lookupError
}
diff --git a/go/operation/allocate_volume.go b/go/replication/allocate_volume.go
index 3f96583e5..0f5ebc00f 100644
--- a/go/operation/allocate_volume.go
+++ b/go/replication/allocate_volume.go
@@ -1,4 +1,4 @@
-package operation
+package replication
import (
"code.google.com/p/weed-fs/go/storage"
diff --git a/go/replication/store_replicate.go b/go/replication/store_replicate.go
index bc630c5d1..3e709de44 100644
--- a/go/replication/store_replicate.go
+++ b/go/replication/store_replicate.go
@@ -71,7 +71,7 @@ func ReplicatedDelete(masterNode string, store *storage.Store, volumeId storage.
}
func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) bool) bool {
- if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId); lookupErr == nil {
+ if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil {
length := 0
selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port))
results := make(chan bool)
diff --git a/go/replication/volume_growth.go b/go/replication/volume_growth.go
index 6e5bf1f5c..d7d1c90bd 100644
--- a/go/replication/volume_growth.go
+++ b/go/replication/volume_growth.go
@@ -2,7 +2,6 @@ package replication
import (
"code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/operation"
"code.google.com/p/weed-fs/go/storage"
"code.google.com/p/weed-fs/go/topology"
"errors"
@@ -200,7 +199,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, collection string, repType
}
func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, collection string, repType storage.ReplicationType, servers ...*topology.DataNode) error {
for _, server := range servers {
- if err := operation.AllocateVolume(server, vid, collection, repType); err == nil {
+ if err := AllocateVolume(server, vid, collection, repType); err == nil {
vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: collection, RepType: repType, Version: storage.CurrentVersion}
server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(&vi, server)
diff --git a/go/storage/store.go b/go/storage/store.go
index 84386cd86..52e78d27d 100644
--- a/go/storage/store.go
+++ b/go/storage/store.go
@@ -2,10 +2,13 @@ package storage
import (
"code.google.com/p/weed-fs/go/glog"
+ "code.google.com/p/weed-fs/go/operation"
"code.google.com/p/weed-fs/go/util"
"encoding/json"
+ "errors"
"fmt"
"io/ioutil"
+ "math/rand"
"net/url"
"strconv"
"strings"
@@ -16,16 +19,53 @@ type DiskLocation struct {
maxVolumeCount int
volumes map[VolumeId]*Volume
}
+type MasterNodes struct {
+ nodes []string
+ lastNode int
+}
+
+func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) {
+ mn = &MasterNodes{nodes: []string{bootstrapNode}, lastNode: -1}
+ return
+}
+func (mn *MasterNodes) reset() {
+ if len(mn.nodes) > 1 && mn.lastNode > 0 {
+ mn.lastNode = -mn.lastNode
+ }
+}
+func (mn *MasterNodes) findMaster() (string, error) {
+ if len(mn.nodes) == 0 {
+ return "", errors.New("No master node found!")
+ }
+ if mn.lastNode < 0 {
+ for _, m := range mn.nodes {
+ if masters, e := operation.ListMasters(m); e == nil {
+ mn.nodes = masters
+ mn.lastNode = rand.Intn(len(mn.nodes))
+ glog.V(2).Info("current master node is :", mn.nodes[mn.lastNode])
+ break
+ }
+ }
+ }
+ if len(mn.nodes) == 1 {
+ return mn.nodes[0], nil
+ }
+ if mn.lastNode < 0 {
+ return "", errors.New("No master node avalable!")
+ }
+ return mn.nodes[mn.lastNode], nil
+}
+
type Store struct {
Port int
Ip string
PublicUrl string
locations []*DiskLocation
- masterNode string
dataCenter string //optional informaton, overwriting master setting if exists
rack string //optional information, overwriting master setting if exists
connected bool
volumeSizeLimit uint64 //read from the master
+ masterNodes *MasterNodes
}
func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) {
@@ -199,16 +239,21 @@ type JoinResult struct {
VolumeSizeLimit uint64
}
-func (s *Store) SetMaster(mserver string) {
- s.masterNode = mserver
-}
func (s *Store) SetDataCenter(dataCenter string) {
s.dataCenter = dataCenter
}
func (s *Store) SetRack(rack string) {
s.rack = rack
}
+
+func (s *Store) SetBootstrapMaster(bootstrapMaster string) {
+ s.masterNodes = NewMasterNodes(bootstrapMaster)
+}
func (s *Store) Join() error {
+ masterNode, e := s.masterNodes.findMaster()
+ if e != nil {
+ return e
+ }
stats := new([]*VolumeInfo)
maxVolumeCount := 0
for _, location := range s.locations {
@@ -237,8 +282,9 @@ func (s *Store) Join() error {
values.Add("maxVolumeCount", strconv.Itoa(maxVolumeCount))
values.Add("dataCenter", s.dataCenter)
values.Add("rack", s.rack)
- jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values)
+ jsonBlob, err := util.Post("http://"+masterNode+"/dir/join", values)
if err != nil {
+ s.masterNodes.reset()
return err
}
var ret JoinResult
diff --git a/go/util/post.go b/go/util/http_util.go
index cbc6dcfd5..80589dcfa 100644
--- a/go/util/post.go
+++ b/go/util/http_util.go
@@ -21,3 +21,18 @@ func Post(url string, values url.Values) ([]byte, error) {
}
return b, nil
}
+
+func Get(url string) ([]byte, error) {
+ r, err := http.Get(url)
+ if err != nil {
+ glog.V(0).Infoln("getting ", url, err)
+ return nil, err
+ }
+ defer r.Body.Close()
+ b, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ glog.V(0).Infoln("read get result from", url, err)
+ return nil, err
+ }
+ return b, nil
+}
diff --git a/go/weed/master.go b/go/weed/master.go
index 146974166..97def1948 100644
--- a/go/weed/master.go
+++ b/go/weed/master.go
@@ -28,8 +28,9 @@ var cmdMaster = &Command{
var (
mport = cmdMaster.Flag.Int("port", 9333, "http listen port")
- mip = cmdMaster.Flag.String("ip", "localhost", "http listen port")
+ masterIp = cmdMaster.Flag.String("ip", "", "master ip address")
metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
+ masterPeers = cmdMaster.Flag.String("peers", "", "other master nodes in comma separated ip:port list")
volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1000, "Default Volume Size in MegaBytes")
mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file")
@@ -55,16 +56,28 @@ func runMaster(cmd *Command, args []string) bool {
}
r := mux.NewRouter()
- weed_server.NewMasterServer(r, VERSION, *mport, *metaFolder,
+ ms := weed_server.NewMasterServer(r, VERSION, *mport, *metaFolder,
*volumeSizeLimitMB, *mpulse, *confFile, *defaultRepType, *garbageThreshold, masterWhiteList,
)
- glog.V(0).Infoln("Start Weed Master", VERSION, "at port", *mip+":"+strconv.Itoa(*mport))
+ glog.V(0).Infoln("Start Weed Master", VERSION, "at port", *masterIp+":"+strconv.Itoa(*mport))
+
srv := &http.Server{
- Addr: *mip + ":" + strconv.Itoa(*mport),
+ Addr: *masterIp + ":" + strconv.Itoa(*mport),
Handler: r,
ReadTimeout: time.Duration(*mReadTimeout) * time.Second,
}
+
+ go func() {
+ time.Sleep(100 * time.Millisecond)
+ var peers []string
+ if *masterPeers != "" {
+ peers = strings.Split(*masterPeers, ",")
+ }
+ raftServer := weed_server.NewRaftServer(r, VERSION, peers, *masterIp+":"+strconv.Itoa(*mport), *metaFolder)
+ ms.SetRaftServer(raftServer)
+ }()
+
e := srv.ListenAndServe()
if e != nil {
glog.Fatalf("Fail to start:%s", e)
diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go
index 732097bde..f42585da2 100644
--- a/go/weed/weed_server/volume_server.go
+++ b/go/weed/weed_server/volume_server.go
@@ -43,7 +43,7 @@ func NewVolumeServer(r *http.ServeMux, version string, ip string, port int, publ
go func() {
connected := true
- vs.store.SetMaster(vs.masterNode)
+ vs.store.SetBootstrapMaster(vs.masterNode)
vs.store.SetDataCenter(vs.dataCenter)
vs.store.SetRack(vs.rack)
for {
@@ -58,7 +58,11 @@ func NewVolumeServer(r *http.ServeMux, version string, ip string, port int, publ
connected = false
}
}
- time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*(1+rand.Float32())) * time.Millisecond)
+ if connected {
+ time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*(1+rand.Float32())) * time.Millisecond)
+ } else {
+ time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)* 0.25) * time.Millisecond)
+ }
}
}()
glog.V(0).Infoln("store joined at", vs.masterNode)
diff --git a/go/weed/weed_server/volume_server_handlers.go b/go/weed/weed_server/volume_server_handlers.go
index c7671fe37..2f4673763 100644
--- a/go/weed/weed_server/volume_server_handlers.go
+++ b/go/weed/weed_server/volume_server_handlers.go
@@ -102,7 +102,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
glog.V(2).Infoln("volume", volumeId, "reading", n)
if !vs.store.HasVolume(volumeId) {
- lookupResult, err := operation.Lookup(vs.masterNode, volumeId)
+ lookupResult, err := operation.Lookup(vs.masterNode, volumeId.String())
glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
if err == nil && len(lookupResult.Locations) > 0 {
http.Redirect(w, r, "http://"+lookupResult.Locations[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)