aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go/metastore/backing_test.go30
-rw-r--r--go/metastore/file_backing.go34
-rw-r--r--go/metastore/memory_backing.go36
-rw-r--r--go/metastore/metastore.go33
-rw-r--r--go/sequence/memory_sequencer.go21
-rw-r--r--go/sequence/sequence.go86
-rw-r--r--go/storage/cdb_map.go4
-rw-r--r--go/storage/needle_map.go13
-rw-r--r--go/storage/store.go8
-rw-r--r--go/topology/topology.go9
-rw-r--r--go/weed/master.go3
-rw-r--r--go/weed/server.go3
-rw-r--r--go/weed/weed_server/master_server.go7
-rw-r--r--go/weed/weed_server/master_server_handlers_admin.go3
-rw-r--r--go/weed/weed_server/raft_server.go20
-rw-r--r--go/weed/weed_server/raft_server_handlers.go2
-rw-r--r--go/weed/weed_server/volume_server.go4
17 files changed, 60 insertions, 256 deletions
diff --git a/go/metastore/backing_test.go b/go/metastore/backing_test.go
deleted file mode 100644
index a3de491c7..000000000
--- a/go/metastore/backing_test.go
+++ /dev/null
@@ -1,30 +0,0 @@
-package metastore
-
-import (
- "testing"
-)
-
-func TestMemoryBacking(t *testing.T) {
- ms := &MetaStore{NewMetaStoreMemoryBacking()}
- verifySetGet(t, ms)
-}
-
-func TestFileBacking(t *testing.T) {
- ms := &MetaStore{NewMetaStoreFileBacking()}
- verifySetGet(t, ms)
-}
-
-func verifySetGet(t *testing.T, ms *MetaStore) {
- data := uint64(234234)
- ms.SetUint64("/tmp/sequence", data)
- if !ms.Has("/tmp/sequence") {
- t.Errorf("Failed to set data")
- }
- if val, err := ms.GetUint64("/tmp/sequence"); err == nil {
- if val != data {
- t.Errorf("Set %d, but read back %d", data, val)
- }
- } else {
- t.Errorf("Failed to get back data:%s", err)
- }
-}
diff --git a/go/metastore/file_backing.go b/go/metastore/file_backing.go
deleted file mode 100644
index 1dc0c963f..000000000
--- a/go/metastore/file_backing.go
+++ /dev/null
@@ -1,34 +0,0 @@
-package metastore
-
-import (
- "io/ioutil"
- "os"
-)
-
-// store data on disk, enough for most cases
-
-type MetaStoreFileBacking struct {
-}
-
-func NewMetaStoreFileBacking() *MetaStoreFileBacking {
- mms := &MetaStoreFileBacking{}
- return mms
-}
-
-func (mms *MetaStoreFileBacking) Set(path, val string) error {
- return ioutil.WriteFile(path, []byte(val), 0644)
-}
-
-func (mms *MetaStoreFileBacking) Get(path string) (string, error) {
- val, e := ioutil.ReadFile(path)
- return string(val), e
-}
-
-func (mms *MetaStoreFileBacking) Has(path string) (ok bool) {
- seqFile, se := os.OpenFile(path, os.O_RDONLY, 0644)
- if se != nil {
- return false
- }
- defer seqFile.Close()
- return true
-}
diff --git a/go/metastore/memory_backing.go b/go/metastore/memory_backing.go
deleted file mode 100644
index 4f45c2e5f..000000000
--- a/go/metastore/memory_backing.go
+++ /dev/null
@@ -1,36 +0,0 @@
-package metastore
-
-import (
- "fmt"
-)
-
-//this is for testing only
-
-type MetaStoreMemoryBacking struct {
- m map[string]string
-}
-
-func NewMetaStoreMemoryBacking() *MetaStoreMemoryBacking {
- mms := &MetaStoreMemoryBacking{}
- mms.m = make(map[string]string)
- return mms
-}
-
-func (mms MetaStoreMemoryBacking) Set(path, val string) error {
- mms.m[path] = val
- return nil
-}
-
-func (mms MetaStoreMemoryBacking) Get(path string) (val string, err error) {
- var ok bool
- val, ok = mms.m[path]
- if !ok {
- return "", fmt.Errorf("Missing value for %s", path)
- }
- return
-}
-
-func (mms MetaStoreMemoryBacking) Has(path string) (ok bool) {
- _, ok = mms.m[path]
- return
-}
diff --git a/go/metastore/metastore.go b/go/metastore/metastore.go
deleted file mode 100644
index 2b1fdc6d8..000000000
--- a/go/metastore/metastore.go
+++ /dev/null
@@ -1,33 +0,0 @@
-package metastore
-
-import (
- "errors"
- "strconv"
-)
-
-type MetaStoreBacking interface {
- Get(path string) (string, error)
- Set(path, val string) error
- Has(path string) bool
-}
-
-type MetaStore struct {
- MetaStoreBacking
-}
-
-func (m *MetaStore) SetUint64(path string, val uint64) error {
- return m.Set(path, strconv.FormatUint(val, 10))
-}
-
-func (m *MetaStore) GetUint64(path string) (val uint64, err error) {
- if b, e := m.Get(path); e == nil {
- val, err = strconv.ParseUint(b, 10, 64)
- return
- } else {
- if e != nil {
- return 0, e
- }
- err = errors.New("Not found value for " + path)
- }
- return
-}
diff --git a/go/sequence/memory_sequencer.go b/go/sequence/memory_sequencer.go
index d72952ff4..c7ee1ae8f 100644
--- a/go/sequence/memory_sequencer.go
+++ b/go/sequence/memory_sequencer.go
@@ -1,10 +1,13 @@
package sequence
-import ()
+import (
+ "sync"
+)
// just for testing
type MemorySequencer struct {
- counter uint64
+ counter uint64
+ sequenceLock sync.Mutex
}
func NewMemorySequencer() (m *MemorySequencer) {
@@ -13,7 +16,21 @@ func NewMemorySequencer() (m *MemorySequencer) {
}
func (m *MemorySequencer) NextFileId(count int) (uint64, int) {
+ m.sequenceLock.Lock()
+ defer m.sequenceLock.Unlock()
ret := m.counter
m.counter += uint64(count)
return ret, count
}
+
+func (m *MemorySequencer) SetMax(seenValue uint64) {
+ m.sequenceLock.Lock()
+ defer m.sequenceLock.Unlock()
+ if m.counter <= seenValue {
+ m.counter = seenValue + 1
+ }
+}
+
+func (m *MemorySequencer) Peek() uint64 {
+ return m.counter
+}
diff --git a/go/sequence/sequence.go b/go/sequence/sequence.go
index 493804ec6..5a1bceaaf 100644
--- a/go/sequence/sequence.go
+++ b/go/sequence/sequence.go
@@ -1,89 +1,9 @@
package sequence
-import (
- "bytes"
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/metastore"
- "encoding/gob"
- "sync"
-)
-
-const (
- FileIdSaveInterval = 10000
-)
+import ()
type Sequencer interface {
NextFileId(count int) (uint64, int)
-}
-type SequencerImpl struct {
- fileFullPath string
-
- volumeLock sync.Mutex
- sequenceLock sync.Mutex
-
- FileIdSequence uint64
- fileIdCounter uint64
-
- metaStore *metastore.MetaStore
-}
-
-func NewFileSequencer(filepath string) (m *SequencerImpl) {
- m = &SequencerImpl{fileFullPath: filepath}
- m.metaStore = &metastore.MetaStore{metastore.NewMetaStoreFileBacking()}
- m.initilize()
- return
-}
-
-func (m *SequencerImpl) initilize() {
- if !m.metaStore.Has(m.fileFullPath) {
- m.FileIdSequence = FileIdSaveInterval
- glog.V(0).Infoln("Setting file id sequence", m.FileIdSequence)
- } else {
- var err error
- if m.FileIdSequence, err = m.metaStore.GetUint64(m.fileFullPath); err != nil {
- if data, err := m.metaStore.Get(m.fileFullPath); err == nil {
- m.FileIdSequence = decode(data)
- glog.V(0).Infoln("Decoding old version of FileIdSequence", m.FileIdSequence)
- } else {
- glog.V(0).Infof("No existing FileIdSequence: %s", err)
- }
- } else {
- glog.V(0).Infoln("Loading file id sequence", m.FileIdSequence)
- }
- //in case the server stops between intervals
- }
- return
-}
-
-//count should be 1 or more
-func (m *SequencerImpl) NextFileId(count int) (uint64, int) {
- if count <= 0 {
- return 0, 0
- }
- m.sequenceLock.Lock()
- defer m.sequenceLock.Unlock()
- if m.fileIdCounter < uint64(count) {
- m.fileIdCounter = FileIdSaveInterval
- m.FileIdSequence += FileIdSaveInterval
- m.saveSequence()
- }
- m.fileIdCounter = m.fileIdCounter - uint64(count)
- return m.FileIdSequence - m.fileIdCounter - uint64(count), count
-}
-func (m *SequencerImpl) saveSequence() {
- glog.V(0).Infoln("Saving file id sequence", m.FileIdSequence, "to", m.fileFullPath)
- if e := m.metaStore.SetUint64(m.fileFullPath, m.FileIdSequence); e != nil {
- glog.Fatalf("Sequence id Save [ERROR] %s", e)
- }
-}
-
-//decode are for backward compatible purpose
-func decode(input string) uint64 {
- var x uint64
- b := bytes.NewReader([]byte(input))
- decoder := gob.NewDecoder(b)
- if e := decoder.Decode(&x); e == nil {
- return x
- }
- return 0
+ SetMax(uint64)
+ Peek() uint64
}
diff --git a/go/storage/cdb_map.go b/go/storage/cdb_map.go
index 8be302111..14437c45b 100644
--- a/go/storage/cdb_map.go
+++ b/go/storage/cdb_map.go
@@ -80,8 +80,8 @@ func (m cdbMap) FileCount() int {
func (m *cdbMap) DeletedCount() int {
return m.DeletionCounter
}
-func (m *cdbMap) NextFileKey(count int) uint64 {
- return 0
+func (m *cdbMap) MaxFileKey() uint64 {
+ return m.MaximumFileKey
}
func getMetric(c *cdb.Cdb, m *mapMetric) error {
diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go
index 9b8776c5e..9c77fcf73 100644
--- a/go/storage/needle_map.go
+++ b/go/storage/needle_map.go
@@ -19,7 +19,7 @@ type NeedleMapper interface {
FileCount() int
DeletedCount() int
Visit(visit func(NeedleValue) error) (err error)
- NextFileKey(count int) uint64
+ MaxFileKey() uint64
}
type mapMetric struct {
@@ -110,6 +110,9 @@ func walkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) e
}
func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
+ if key > nm.MaximumFileKey {
+ nm.MaximumFileKey = key
+ }
oldSize := nm.m.Set(Key(key), offset, size)
bytes := make([]byte, 16)
util.Uint64toBytes(bytes[0:8], key)
@@ -172,11 +175,3 @@ func (nm *NeedleMap) Visit(visit func(NeedleValue) error) (err error) {
func (nm NeedleMap) MaxFileKey() uint64 {
return nm.MaximumFileKey
}
-func (nm NeedleMap) NextFileKey(count int) (ret uint64) {
- if count <= 0 {
- return 0
- }
- ret = nm.MaximumFileKey
- nm.MaximumFileKey += uint64(count)
- return
-}
diff --git a/go/storage/store.go b/go/storage/store.go
index 157344781..a4263dcac 100644
--- a/go/storage/store.go
+++ b/go/storage/store.go
@@ -44,6 +44,9 @@ func (mn *MasterNodes) findMaster() (string, error) {
if mn.lastNode < 0 {
for _, m := range mn.nodes {
if masters, e := operation.ListMasters(m); e == nil {
+ if len(masters) == 0 {
+ continue
+ }
mn.nodes = masters
mn.lastNode = rand.Intn(len(mn.nodes))
glog.V(2).Info("current master node is :", mn.nodes[mn.lastNode])
@@ -268,6 +271,7 @@ func (s *Store) Join() error {
}
stats := new([]*VolumeInfo)
maxVolumeCount := 0
+ var maxFileKey uint64
for _, location := range s.Locations {
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
for k, v := range location.volumes {
@@ -280,6 +284,9 @@ func (s *Store) Join() error {
DeletedByteCount: v.nm.DeletedSize(),
ReadOnly: v.readOnly}
*stats = append(*stats, s)
+ if maxFileKey < v.nm.MaxFileKey() {
+ maxFileKey = v.nm.MaxFileKey()
+ }
}
}
bytes, _ := json.Marshal(stats)
@@ -292,6 +299,7 @@ func (s *Store) Join() error {
values.Add("publicUrl", s.PublicUrl)
values.Add("volumes", string(bytes))
values.Add("maxVolumeCount", strconv.Itoa(maxVolumeCount))
+ values.Add("maxFileKey", strconv.FormatUint(maxFileKey, 10))
values.Add("dataCenter", s.dataCenter)
values.Add("rack", s.rack)
jsonBlob, err := util.Post("http://"+masterNode+"/dir/join", values)
diff --git a/go/topology/topology.go b/go/topology/topology.go
index b1fa3f2a2..9db3e78ae 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -19,7 +19,7 @@ type Topology struct {
volumeSizeLimit uint64
- sequence sequence.Sequencer
+ Sequence sequence.Sequencer
chanDeadDataNodes chan *DataNode
chanRecoveredDataNodes chan *DataNode
@@ -40,7 +40,7 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL
t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit
- t.sequence = seq
+ t.Sequence = seq
t.chanDeadDataNodes = make(chan *DataNode)
t.chanRecoveredDataNodes = make(chan *DataNode)
@@ -118,7 +118,7 @@ func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, in
if err != nil || datanodes.Length() == 0 {
return "", 0, nil, errors.New("No writable volumes avalable!")
}
- fileId, count := t.sequence.NextFileId(count)
+ fileId, count := t.Sequence.NextFileId(count)
return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
}
@@ -143,7 +143,8 @@ func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(&v, dn)
}
-func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, dcName string, rackName string) {
+func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, maxFileKey uint64, dcName string, rackName string) {
+ t.Sequence.SetMax(maxFileKey)
dcName, rackName = t.configuration.Locate(ip, dcName, rackName)
dc := t.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
diff --git a/go/weed/master.go b/go/weed/master.go
index 1efa0e79c..c494ff42b 100644
--- a/go/weed/master.go
+++ b/go/weed/master.go
@@ -72,11 +72,12 @@ func runMaster(cmd *Command, args []string) bool {
go func() {
time.Sleep(100 * time.Millisecond)
+ myAddress := *masterIp + ":" + strconv.Itoa(*mport)
var peers []string
if *masterPeers != "" {
peers = strings.Split(*masterPeers, ",")
}
- raftServer := weed_server.NewRaftServer(r, peers, *masterIp+":"+strconv.Itoa(*mport), *metaFolder, ms.Topo, *mpulse)
+ raftServer := weed_server.NewRaftServer(r, peers, myAddress, *metaFolder, ms.Topo, *mpulse)
ms.SetRaftServer(raftServer)
}()
diff --git a/go/weed/server.go b/go/weed/server.go
index 87b541fd3..61e42fb36 100644
--- a/go/weed/server.go
+++ b/go/weed/server.go
@@ -164,11 +164,12 @@ func runServer(cmd *Command, args []string) bool {
go func() {
raftWaitForMaster.Wait()
time.Sleep(100 * time.Millisecond)
+ myAddress := *serverIp + ":" + strconv.Itoa(*masterPort)
var peers []string
if *serverPeers != "" {
peers = strings.Split(*serverPeers, ",")
}
- raftServer := weed_server.NewRaftServer(r, peers, *serverIp+":"+strconv.Itoa(*masterPort), *masterMetaFolder, ms.Topo, *volumePulse)
+ raftServer := weed_server.NewRaftServer(r, peers, myAddress, *masterMetaFolder, ms.Topo, *volumePulse)
ms.SetRaftServer(raftServer)
volumeWait.Done()
}()
diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go
index b932e1b11..874688cbb 100644
--- a/go/weed/weed_server/master_server.go
+++ b/go/weed/weed_server/master_server.go
@@ -11,7 +11,6 @@ import (
"net/http"
"net/http/httputil"
"net/url"
- "path"
"sync"
)
@@ -48,7 +47,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
whiteList: whiteList,
}
ms.bounedLeaderChan = make(chan int, 16)
- seq := sequence.NewFileSequencer(path.Join(metaFolder, "weed.seq"))
+ seq := sequence.NewMemorySequencer()
var e error
if ms.Topo, e = topology.NewTopology("topo", confFile, seq,
uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds); e != nil {
@@ -97,7 +96,7 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
return func(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
f(w, r)
- } else if ms.Topo.RaftServer.Leader() != "" {
+ } else if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
ms.bounedLeaderChan <- 1
defer func() { <-ms.bounedLeaderChan }()
targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader())
@@ -111,7 +110,7 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
proxy.ServeHTTP(w, r)
} else {
//drop it to the floor
- writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+"does not know Leader yet:"+ms.Topo.RaftServer.Leader()))
+ writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader()))
}
}
}
diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go
index d34baa349..e549a1dfb 100644
--- a/go/weed/weed_server/master_server_handlers_admin.go
+++ b/go/weed/weed_server/master_server_handlers_admin.go
@@ -36,6 +36,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
}
port, _ := strconv.Atoi(r.FormValue("port"))
maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount"))
+ maxFileKey, _ := strconv.ParseUint(r.FormValue("maxFileKey"), 10, 64)
s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port")
publicUrl := r.FormValue("publicUrl")
volumes := new([]storage.VolumeInfo)
@@ -44,7 +45,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
return
}
debug(s, "volumes", r.FormValue("volumes"))
- ms.Topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, r.FormValue("dataCenter"), r.FormValue("rack"))
+ ms.Topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, maxFileKey, r.FormValue("dataCenter"), r.FormValue("rack"))
writeJsonQuiet(w, r, operation.JoinResult{VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024})
}
diff --git a/go/weed/weed_server/raft_server.go b/go/weed/weed_server/raft_server.go
index b5a9dd9b6..f67caaebd 100644
--- a/go/weed/weed_server/raft_server.go
+++ b/go/weed/weed_server/raft_server.go
@@ -77,13 +77,6 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin
return nil
}
}
- var err error
- for err != nil {
- glog.V(0).Infoln("waiting for peers on", strings.Join(s.peers, ","), "...")
- time.Sleep(time.Duration(1000+rand.Intn(2000)) * time.Millisecond)
- err = s.Join(s.peers)
- }
- glog.V(0).Infoln("Joined cluster")
}
// Initialize the server by joining itself.
@@ -124,14 +117,17 @@ func (s *RaftServer) Join(peers []string) error {
ConnectionString: "http://" + s.httpAddr,
}
+ var err error
var b bytes.Buffer
json.NewEncoder(&b).Encode(command)
-
for _, m := range peers {
+ if m == s.httpAddr {
+ continue
+ }
target := fmt.Sprintf("http://%s/cluster/join", strings.TrimSpace(m))
glog.V(0).Infoln("Attempting to connect to:", target)
- err := postFollowingOneRedirect(target, "application/json", &b)
+ err = postFollowingOneRedirect(target, "application/json", &b)
if err != nil {
glog.V(0).Infoln("Post returned error: ", err.Error())
@@ -139,11 +135,9 @@ func (s *RaftServer) Join(peers []string) error {
// If we receive a network error try the next member
continue
}
-
- return err
+ } else {
+ return nil
}
-
- return nil
}
return errors.New("Could not connect to any cluster peers")
diff --git a/go/weed/weed_server/raft_server_handlers.go b/go/weed/weed_server/raft_server_handlers.go
index 6a3c58b29..1ce24a963 100644
--- a/go/weed/weed_server/raft_server_handlers.go
+++ b/go/weed/weed_server/raft_server_handlers.go
@@ -18,7 +18,7 @@ func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) {
commandText, _ := ioutil.ReadAll(req.Body)
glog.V(0).Info("Command:", string(commandText))
if err := json.NewDecoder(strings.NewReader(string(commandText))).Decode(&command); err != nil {
- glog.V(0).Infoln("Error decoding json message:", err)
+ glog.V(0).Infoln("Error decoding json message:", err, string(commandText))
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go
index 377e1e99c..a2db0d8a9 100644
--- a/go/weed/weed_server/volume_server.go
+++ b/go/weed/weed_server/volume_server.go
@@ -54,10 +54,10 @@ func NewVolumeServer(r *http.ServeMux, ip string, port int, publicUrl string, fo
if err == nil {
if !connected {
connected = true
- glog.V(0).Infoln("Reconnected with master")
+ glog.V(0).Infoln("Volume Server Connected with master")
}
} else {
- glog.V(4).Infoln("Failing to talk with master:", err.Error())
+ glog.V(4).Infoln("Volume Server Failed to talk with master:", err.Error())
if connected {
connected = false
}