aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go/metastore/backing_test.go11
-rw-r--r--go/metastore/etcd_backing.go51
-rw-r--r--go/metastore/file_backing.go18
-rw-r--r--go/metastore/memory_backing.go23
-rw-r--r--go/metastore/metastore.go24
-rw-r--r--go/replication/volume_growth_test.go3
-rw-r--r--go/sequence/memory_sequencer.go19
-rw-r--r--go/sequence/sequence.go35
-rw-r--r--go/topology/node_list_test.go3
-rw-r--r--go/topology/topo_test.go3
-rw-r--r--go/topology/topology.go4
-rw-r--r--go/weed/master.go11
12 files changed, 149 insertions, 56 deletions
diff --git a/go/metastore/backing_test.go b/go/metastore/backing_test.go
index 2a2c23323..c8497fe4b 100644
--- a/go/metastore/backing_test.go
+++ b/go/metastore/backing_test.go
@@ -14,13 +14,18 @@ func TestFileBacking(t *testing.T) {
verifySetGet(t, ms)
}
+func TestEtcdBacking(t *testing.T) {
+ ms := &MetaStore{NewMetaStoreEtcdBacking("http://localhost:4001")}
+ verifySetGet(t, ms)
+}
+
func verifySetGet(t *testing.T, ms *MetaStore) {
data := uint64(234234)
- ms.SetUint64(data, "/tmp", "sequence")
- if !ms.Has("/tmp", "sequence") {
+ ms.SetUint64("/tmp/sequence", data)
+ if !ms.Has("/tmp/sequence") {
t.Errorf("Failed to set data")
}
- if val, err := ms.GetUint64("/tmp", "sequence"); err == nil {
+ if val, err := ms.GetUint64("/tmp/sequence"); err == nil {
if val != data {
t.Errorf("Set %d, but read back %d", data, val)
}
diff --git a/go/metastore/etcd_backing.go b/go/metastore/etcd_backing.go
new file mode 100644
index 000000000..ac5f2b1a5
--- /dev/null
+++ b/go/metastore/etcd_backing.go
@@ -0,0 +1,51 @@
+package metastore
+
+import (
+ "code.google.com/p/weed-fs/go/glog"
+ "errors"
+ "github.com/coreos/go-etcd/etcd"
+ "strings"
+)
+
+// store data on etcd
+
+type MetaStoreEtcdBacking struct {
+ client *etcd.Client
+}
+
+func NewMetaStoreEtcdBacking(etcdCluster string) *MetaStoreEtcdBacking {
+ m := &MetaStoreEtcdBacking{}
+ m.client = etcd.NewClient(strings.Split(etcdCluster, ","))
+ return m
+}
+
+func (m MetaStoreEtcdBacking) Set(path, val string) error {
+ res, e := m.client.Set(path, val, 0)
+ glog.V(0).Infof("etcd set response: %+v\n", res)
+ return e
+}
+
+func (m MetaStoreEtcdBacking) Get(path string) (string, error) {
+ results, err := m.client.Get(path)
+ for i, res := range results {
+ glog.V(0).Infof("[%d] get response: %+v\n", i, res)
+ }
+ if err != nil {
+ return "", err
+ }
+ if results[0].Key != path {
+ return "", errors.New("Key Not Found:" + path)
+ }
+ return results[0].Value, nil
+}
+
+func (m MetaStoreEtcdBacking) Has(path string) (ok bool) {
+ results, err := m.client.Get(path)
+ if err != nil {
+ return false
+ }
+ if results[0].Key != path {
+ return false
+ }
+ return true
+}
diff --git a/go/metastore/file_backing.go b/go/metastore/file_backing.go
index 5fb3b39cc..1dc0c963f 100644
--- a/go/metastore/file_backing.go
+++ b/go/metastore/file_backing.go
@@ -3,7 +3,6 @@ package metastore
import (
"io/ioutil"
"os"
- "path"
)
// store data on disk, enough for most cases
@@ -11,21 +10,22 @@ import (
type MetaStoreFileBacking struct {
}
-func NewMetaStoreFileBacking() MetaStoreFileBacking {
- mms := MetaStoreFileBacking{}
+func NewMetaStoreFileBacking() *MetaStoreFileBacking {
+ mms := &MetaStoreFileBacking{}
return mms
}
-func (mms MetaStoreFileBacking) Set(val []byte, elem ...string) error {
- return ioutil.WriteFile(path.Join(elem...), val, 0644)
+func (mms *MetaStoreFileBacking) Set(path, val string) error {
+ return ioutil.WriteFile(path, []byte(val), 0644)
}
-func (mms MetaStoreFileBacking) Get(elem ...string) (val []byte, err error) {
- return ioutil.ReadFile(path.Join(elem...))
+func (mms *MetaStoreFileBacking) Get(path string) (string, error) {
+ val, e := ioutil.ReadFile(path)
+ return string(val), e
}
-func (mms MetaStoreFileBacking) Has(elem ...string) (ok bool) {
- seqFile, se := os.OpenFile(path.Join(elem...), os.O_RDONLY, 0644)
+func (mms *MetaStoreFileBacking) Has(path string) (ok bool) {
+ seqFile, se := os.OpenFile(path, os.O_RDONLY, 0644)
if se != nil {
return false
}
diff --git a/go/metastore/memory_backing.go b/go/metastore/memory_backing.go
index 86957225a..4f45c2e5f 100644
--- a/go/metastore/memory_backing.go
+++ b/go/metastore/memory_backing.go
@@ -2,36 +2,35 @@ package metastore
import (
"fmt"
- "path"
)
//this is for testing only
type MetaStoreMemoryBacking struct {
- m map[string][]byte
+ m map[string]string
}
-func NewMetaStoreMemoryBacking() MetaStoreMemoryBacking {
- mms := MetaStoreMemoryBacking{}
- mms.m = make(map[string][]byte)
+func NewMetaStoreMemoryBacking() *MetaStoreMemoryBacking {
+ mms := &MetaStoreMemoryBacking{}
+ mms.m = make(map[string]string)
return mms
}
-func (mms MetaStoreMemoryBacking) Set(val []byte, elem ...string) error {
- mms.m[path.Join(elem...)] = val
+func (mms MetaStoreMemoryBacking) Set(path, val string) error {
+ mms.m[path] = val
return nil
}
-func (mms MetaStoreMemoryBacking) Get(elem ...string) (val []byte, err error) {
+func (mms MetaStoreMemoryBacking) Get(path string) (val string, err error) {
var ok bool
- val, ok = mms.m[path.Join(elem...)]
+ val, ok = mms.m[path]
if !ok {
- return nil, fmt.Errorf("Missing value for %s", path.Join(elem...))
+ return "", fmt.Errorf("Missing value for %s", path)
}
return
}
-func (mms MetaStoreMemoryBacking) Has(elem ...string) (ok bool) {
- _, ok = mms.m[path.Join(elem...)]
+func (mms MetaStoreMemoryBacking) Has(path string) (ok bool) {
+ _, ok = mms.m[path]
return
}
diff --git a/go/metastore/metastore.go b/go/metastore/metastore.go
index da87dbe85..2b1fdc6d8 100644
--- a/go/metastore/metastore.go
+++ b/go/metastore/metastore.go
@@ -1,35 +1,33 @@
package metastore
import (
- "code.google.com/p/weed-fs/go/util"
"errors"
- "path"
+ "strconv"
)
type MetaStoreBacking interface {
- Get(elem ...string) ([]byte, error)
- Set(val []byte, elem ...string) error
- Has(elem ...string) bool
+ Get(path string) (string, error)
+ Set(path, val string) error
+ Has(path string) bool
}
type MetaStore struct {
MetaStoreBacking
}
-func (m *MetaStore) SetUint64(val uint64, elem ...string) error {
- b := make([]byte, 8)
- util.Uint64toBytes(b, val)
- return m.Set(b, elem...)
+func (m *MetaStore) SetUint64(path string, val uint64) error {
+ return m.Set(path, strconv.FormatUint(val, 10))
}
-func (m *MetaStore) GetUint64(elem ...string) (val uint64, err error) {
- if b, e := m.Get(elem...); e == nil && len(b) == 8 {
- val = util.BytesToUint64(b)
+func (m *MetaStore) GetUint64(path string) (val uint64, err error) {
+ if b, e := m.Get(path); e == nil {
+ val, err = strconv.ParseUint(b, 10, 64)
+ return
} else {
if e != nil {
return 0, e
}
- err = errors.New("Not found value for " + path.Join(elem...))
+ err = errors.New("Not found value for " + path)
}
return
}
diff --git a/go/replication/volume_growth_test.go b/go/replication/volume_growth_test.go
index 031972bee..99f82a7fa 100644
--- a/go/replication/volume_growth_test.go
+++ b/go/replication/volume_growth_test.go
@@ -1,6 +1,7 @@
package replication
import (
+ "code.google.com/p/weed-fs/go/sequence"
"code.google.com/p/weed-fs/go/storage"
"code.google.com/p/weed-fs/go/topology"
"encoding/json"
@@ -79,7 +80,7 @@ func setup(topologyLayout string) *topology.Topology {
//need to connect all nodes first before server adding volumes
topo, err := topology.NewTopology("weedfs", "/etc/weedfs/weedfs.conf",
- "/tmp", "testing", 32*1024, 5)
+ sequence.NewMemorySequencer(), 32*1024, 5)
if err != nil {
panic("error: " + err.Error())
}
diff --git a/go/sequence/memory_sequencer.go b/go/sequence/memory_sequencer.go
new file mode 100644
index 000000000..d72952ff4
--- /dev/null
+++ b/go/sequence/memory_sequencer.go
@@ -0,0 +1,19 @@
+package sequence
+
+import ()
+
+// just for testing
+type MemorySequencer struct {
+ counter uint64
+}
+
+func NewMemorySequencer() (m *MemorySequencer) {
+ m = &MemorySequencer{counter: 1}
+ return
+}
+
+func (m *MemorySequencer) NextFileId(count int) (uint64, int) {
+ ret := m.counter
+ m.counter += uint64(count)
+ return ret, count
+}
diff --git a/go/sequence/sequence.go b/go/sequence/sequence.go
index 774607e54..bbc4bdf82 100644
--- a/go/sequence/sequence.go
+++ b/go/sequence/sequence.go
@@ -5,7 +5,6 @@ import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/metastore"
"encoding/gob"
- "path"
"sync"
)
@@ -17,8 +16,7 @@ type Sequencer interface {
NextFileId(count int) (uint64, int)
}
type SequencerImpl struct {
- dir string
- fileName string
+ fileFullPath string
volumeLock sync.Mutex
sequenceLock sync.Mutex
@@ -29,19 +27,30 @@ type SequencerImpl struct {
metaStore *metastore.MetaStore
}
-func NewSequencer(dirname string, filename string) (m *SequencerImpl) {
- m = &SequencerImpl{dir: dirname, fileName: filename}
+func NewFileSequencer(filepath string) (m *SequencerImpl) {
+ m = &SequencerImpl{fileFullPath: filepath}
m.metaStore = &metastore.MetaStore{metastore.NewMetaStoreFileBacking()}
+ m.initilize()
+ return
+}
+
+func NewEtcdSequencer(etcdCluster string) (m *SequencerImpl) {
+ m = &SequencerImpl{fileFullPath: "/weedfs/default/sequence"}
+ m.metaStore = &metastore.MetaStore{metastore.NewMetaStoreEtcdBacking(etcdCluster)}
+ m.initilize()
+ return
+}
- if !m.metaStore.Has(m.dir, m.fileName+".seq") {
+func (m *SequencerImpl) initilize() {
+ if !m.metaStore.Has(m.fileFullPath) {
m.FileIdSequence = FileIdSaveInterval
glog.V(0).Infoln("Setting file id sequence", m.FileIdSequence)
} else {
var err error
- if m.FileIdSequence, err = m.metaStore.GetUint64(m.dir, m.fileName+".seq"); err != nil {
- if data, err := m.metaStore.Get(m.dir, m.fileName+".seq"); err == nil {
+ if m.FileIdSequence, err = m.metaStore.GetUint64(m.fileFullPath); err != nil {
+ if data, err := m.metaStore.Get(m.fileFullPath); err == nil {
m.FileIdSequence = decode(data)
- glog.V(0).Infoln("Decoding old version of FileIdSequence", m.FileIdSequence)
+ glog.V(0).Infoln("Decoding old version of FileIdSequence", m.FileIdSequence)
} else {
glog.V(0).Infof("No existing FileIdSequence: %s", err)
}
@@ -69,16 +78,16 @@ func (m *SequencerImpl) NextFileId(count int) (uint64, int) {
return m.FileIdSequence - m.fileIdCounter - uint64(count), count
}
func (m *SequencerImpl) saveSequence() {
- glog.V(0).Infoln("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq"))
- if e := m.metaStore.SetUint64(m.FileIdSequence, m.dir, m.fileName+".seq"); e != nil {
+ glog.V(0).Infoln("Saving file id sequence", m.FileIdSequence, "to", m.fileFullPath)
+ if e := m.metaStore.SetUint64(m.fileFullPath, m.FileIdSequence); e != nil {
glog.Fatalf("Sequence id Save [ERROR] %s", e)
}
}
//decode are for backward compatible purpose
-func decode(input []byte) uint64 {
+func decode(input string) uint64 {
var x uint64
- b := bytes.NewReader(input)
+ b := bytes.NewReader([]byte(input))
decoder := gob.NewDecoder(b)
if e := decoder.Decode(&x); e == nil {
return x
diff --git a/go/topology/node_list_test.go b/go/topology/node_list_test.go
index c7b165ea6..c526f55f8 100644
--- a/go/topology/node_list_test.go
+++ b/go/topology/node_list_test.go
@@ -1,13 +1,14 @@
package topology
import (
+ "code.google.com/p/weed-fs/go/sequence"
_ "fmt"
"strconv"
"testing"
)
func TestXYZ(t *testing.T) {
- topo, err := NewTopology("topo", "/etc/weed.conf", "/tmp", "test", 234, 5)
+ topo, err := NewTopology("topo", "/etc/weed.conf", sequence.NewMemorySequencer(), 234, 5)
if err != nil {
t.Error("cannot create new topology:", err)
t.FailNow()
diff --git a/go/topology/topo_test.go b/go/topology/topo_test.go
index d5ea08086..36f4963db 100644
--- a/go/topology/topo_test.go
+++ b/go/topology/topo_test.go
@@ -1,6 +1,7 @@
package topology
import (
+ "code.google.com/p/weed-fs/go/sequence"
"code.google.com/p/weed-fs/go/storage"
"encoding/json"
"fmt"
@@ -78,7 +79,7 @@ func setup(topologyLayout string) *Topology {
}
//need to connect all nodes first before server adding volumes
- topo, err := NewTopology("mynetwork", "/etc/weed.conf", "/tmp", "test", 234, 5)
+ topo, err := NewTopology("mynetwork", "/etc/weed.conf", sequence.NewMemorySequencer(), 234, 5)
if err != nil {
fmt.Println("error:", err)
}
diff --git a/go/topology/topology.go b/go/topology/topology.go
index d0e9fb42b..b21601210 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -28,7 +28,7 @@ type Topology struct {
configuration *Configuration
}
-func NewTopology(id string, confFile string, dirname string, sequenceFilename string, volumeSizeLimit uint64, pulse int) (*Topology, error) {
+func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) {
t := &Topology{}
t.id = NodeId(id)
t.nodeType = "Topology"
@@ -38,7 +38,7 @@ func NewTopology(id string, confFile string, dirname string, sequenceFilename st
t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit
- t.sequence = sequence.NewSequencer(dirname, sequenceFilename)
+ t.sequence = seq
t.chanDeadDataNodes = make(chan *DataNode)
t.chanRecoveredDataNodes = make(chan *DataNode)
diff --git a/go/weed/master.go b/go/weed/master.go
index c1ada76fb..950aaca6d 100644
--- a/go/weed/master.go
+++ b/go/weed/master.go
@@ -5,12 +5,14 @@ import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/operation"
"code.google.com/p/weed-fs/go/replication"
+ "code.google.com/p/weed-fs/go/sequence"
"code.google.com/p/weed-fs/go/storage"
"code.google.com/p/weed-fs/go/topology"
"encoding/json"
"errors"
"net/http"
"os"
+ "path"
"runtime"
"strconv"
"strings"
@@ -43,6 +45,7 @@ var (
mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
+ etcdCluster = cmdMaster.Flag.String("etcd", "", "comma separated etcd urls, e.g., http://localhost:4001, See github.com/coreos/go-etcd/etcd")
masterWhiteList []string
)
@@ -215,8 +218,14 @@ func runMaster(cmd *Command, args []string) bool {
if *masterWhiteListOption != "" {
masterWhiteList = strings.Split(*masterWhiteListOption, ",")
}
+ var seq sequence.Sequencer
+ if len(*etcdCluster) == 0 {
+ seq = sequence.NewFileSequencer(path.Join(*metaFolder, "weed.seq"))
+ } else {
+ seq = sequence.NewEtcdSequencer(*etcdCluster)
+ }
var e error
- if topo, e = topology.NewTopology("topo", *confFile, *metaFolder, "weed",
+ if topo, e = topology.NewTopology("topo", *confFile, seq,
uint64(*volumeSizeLimitMB)*1024*1024, *mpulse); e != nil {
glog.Fatalf("cannot create topology:%s", e)
}