aboutsummaryrefslogtreecommitdiff
path: root/go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2013-06-19 18:10:38 -0700
committerChris Lu <chris.lu@gmail.com>2013-06-19 18:10:38 -0700
commit50269b74ce615ab02f6bf64a2bc0fc9e71122267 (patch)
tree887f63247a589cb027e65331b9243edcad61f479 /go
parent715d327df0ad64a70837711c664e1ef024e0bcc5 (diff)
downloadseaweedfs-50269b74ce615ab02f6bf64a2bc0fc9e71122267.tar.xz
seaweedfs-50269b74ce615ab02f6bf64a2bc0fc9e71122267.zip
add dataCenter option when assign file keys
add dataCenter option when starting volume servers some work related to freeze a volume. Not tested yet.
Diffstat (limited to 'go')
-rw-r--r--go/replication/store_replicate.go2
-rw-r--r--go/replication/volume_growth.go48
-rw-r--r--go/replication/volume_growth_test.go13
-rw-r--r--go/storage/needle_map.go70
-rw-r--r--go/storage/store.go20
-rw-r--r--go/storage/volume.go52
-rw-r--r--go/topology/configuration.go17
-rw-r--r--go/topology/data_node.go5
-rw-r--r--go/topology/node.go15
-rw-r--r--go/topology/node_list.go30
-rw-r--r--go/topology/node_list_test.go24
-rw-r--r--go/topology/topo_test.go5
-rw-r--r--go/topology/topology.go28
-rw-r--r--go/topology/volume_layout.go44
-rw-r--r--go/weed/master.go12
-rw-r--r--go/weed/volume.go17
16 files changed, 287 insertions, 115 deletions
diff --git a/go/replication/store_replicate.go b/go/replication/store_replicate.go
index cc5d806d2..8306a31b4 100644
--- a/go/replication/store_replicate.go
+++ b/go/replication/store_replicate.go
@@ -3,7 +3,7 @@ package replication
import (
"bytes"
"code.google.com/p/weed-fs/go/operation"
- "code.google.com/p/weed-fs/go/storage"
+ "code.google.com/p/weed-fs/go/storage"
"log"
"net/http"
"strconv"
diff --git a/go/replication/volume_growth.go b/go/replication/volume_growth.go
index 1f266f73f..ce54b1fd4 100644
--- a/go/replication/volume_growth.go
+++ b/go/replication/volume_growth.go
@@ -31,24 +31,24 @@ func NewDefaultVolumeGrowth() *VolumeGrowth {
return &VolumeGrowth{copy1factor: 7, copy2factor: 6, copy3factor: 3}
}
-func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topology.Topology) (int, error) {
+func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, dataCenter string, topo *topology.Topology) (int, error) {
switch repType {
case storage.Copy000:
- return vg.GrowByCountAndType(vg.copy1factor, repType, topo)
+ return vg.GrowByCountAndType(vg.copy1factor, repType, dataCenter, topo)
case storage.Copy001:
- return vg.GrowByCountAndType(vg.copy2factor, repType, topo)
+ return vg.GrowByCountAndType(vg.copy2factor, repType, dataCenter, topo)
case storage.Copy010:
- return vg.GrowByCountAndType(vg.copy2factor, repType, topo)
+ return vg.GrowByCountAndType(vg.copy2factor, repType, dataCenter, topo)
case storage.Copy100:
- return vg.GrowByCountAndType(vg.copy2factor, repType, topo)
+ return vg.GrowByCountAndType(vg.copy2factor, repType, dataCenter, topo)
case storage.Copy110:
- return vg.GrowByCountAndType(vg.copy3factor, repType, topo)
+ return vg.GrowByCountAndType(vg.copy3factor, repType, dataCenter, topo)
case storage.Copy200:
- return vg.GrowByCountAndType(vg.copy3factor, repType, topo)
+ return vg.GrowByCountAndType(vg.copy3factor, repType, dataCenter, topo)
}
return 0, errors.New("Unknown Replication Type!")
}
-func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) {
+func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, dataCenter string, topo *topology.Topology) (counter int, err error) {
vg.accessLock.Lock()
defer vg.accessLock.Unlock()
@@ -56,16 +56,20 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
switch repType {
case storage.Copy000:
for i := 0; i < count; i++ {
- if ok, server, vid := topo.RandomlyReserveOneVolume(); ok {
+ if ok, server, vid := topo.RandomlyReserveOneVolume(dataCenter); ok {
if err = vg.grow(topo, *vid, repType, server); err == nil {
counter++
+ } else {
+ return counter, err
}
+ } else {
+ return counter, fmt.Errorf("Failed to grown volume for data center %s", dataCenter)
}
}
case storage.Copy001:
for i := 0; i < count; i++ {
- //randomly pick one server, and then choose from the same rack
- if ok, server1, vid := topo.RandomlyReserveOneVolume(); ok {
+ //randomly pick one server from the datacenter, and then choose from the same rack
+ if ok, server1, vid := topo.RandomlyReserveOneVolume(dataCenter); ok {
rack := server1.Parent()
exclusion := make(map[string]topology.Node)
exclusion[server1.String()] = server1
@@ -81,8 +85,8 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
}
case storage.Copy010:
for i := 0; i < count; i++ {
- //randomly pick one server, and then choose from the same rack
- if ok, server1, vid := topo.RandomlyReserveOneVolume(); ok {
+ //randomly pick one server from the datacenter, and then choose from the a different rack
+ if ok, server1, vid := topo.RandomlyReserveOneVolume(dataCenter); ok {
rack := server1.Parent()
dc := rack.Parent()
exclusion := make(map[string]topology.Node)
@@ -100,28 +104,32 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
case storage.Copy100:
for i := 0; i < count; i++ {
nl := topology.NewNodeList(topo.Children(), nil)
- picked, ret := nl.RandomlyPickN(2, 1)
+ picked, ret := nl.RandomlyPickN(2, 1, dataCenter)
vid := topo.NextVolumeId()
+ println("growing on picked servers", picked)
if ret {
var servers []*topology.DataNode
for _, n := range picked {
if n.FreeSpace() > 0 {
- if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid); ok {
+ if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid, ""); ok {
servers = append(servers, server)
}
}
}
+ println("growing on servers", servers)
if len(servers) == 2 {
if err = vg.grow(topo, vid, repType, servers...); err == nil {
counter++
}
}
+ } else {
+ return counter, fmt.Errorf("Failed to grown volume on data center %s and another data center", dataCenter)
}
}
case storage.Copy110:
for i := 0; i < count; i++ {
nl := topology.NewNodeList(topo.Children(), nil)
- picked, ret := nl.RandomlyPickN(2, 2)
+ picked, ret := nl.RandomlyPickN(2, 2, dataCenter)
vid := topo.NextVolumeId()
if ret {
var servers []*topology.DataNode
@@ -130,7 +138,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
dc1, dc2 = dc2, dc1
}
if dc1.FreeSpace() > 0 {
- if ok, server1 := dc1.ReserveOneVolume(rand.Intn(dc1.FreeSpace()), vid); ok {
+ if ok, server1 := dc1.ReserveOneVolume(rand.Intn(dc1.FreeSpace()), vid, ""); ok {
servers = append(servers, server1)
rack := server1.Parent()
exclusion := make(map[string]topology.Node)
@@ -144,7 +152,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
}
}
if dc2.FreeSpace() > 0 {
- if ok, server := dc2.ReserveOneVolume(rand.Intn(dc2.FreeSpace()), vid); ok {
+ if ok, server := dc2.ReserveOneVolume(rand.Intn(dc2.FreeSpace()), vid, ""); ok {
servers = append(servers, server)
}
}
@@ -158,13 +166,13 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
case storage.Copy200:
for i := 0; i < count; i++ {
nl := topology.NewNodeList(topo.Children(), nil)
- picked, ret := nl.RandomlyPickN(3, 1)
+ picked, ret := nl.RandomlyPickN(3, 1, dataCenter)
vid := topo.NextVolumeId()
if ret {
var servers []*topology.DataNode
for _, n := range picked {
if n.FreeSpace() > 0 {
- if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid); ok {
+ if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid, ""); ok {
servers = append(servers, server)
}
}
diff --git a/go/replication/volume_growth_test.go b/go/replication/volume_growth_test.go
index a4104716e..031972bee 100644
--- a/go/replication/volume_growth_test.go
+++ b/go/replication/volume_growth_test.go
@@ -5,9 +5,7 @@ import (
"code.google.com/p/weed-fs/go/topology"
"encoding/json"
"fmt"
- "math/rand"
"testing"
- "time"
)
var topologyLayout = `
@@ -80,7 +78,7 @@ func setup(topologyLayout string) *topology.Topology {
fmt.Println("data:", data)
//need to connect all nodes first before server adding volumes
- topo, err := topology.NewTopology("mynetwork", "/etc/weedfs/weedfs.conf",
+ topo, err := topology.NewTopology("weedfs", "/etc/weedfs/weedfs.conf",
"/tmp", "testing", 32*1024, 5)
if err != nil {
panic("error: " + err.Error())
@@ -125,12 +123,3 @@ func TestRemoveDataCenter(t *testing.T) {
t.Fail()
}
}
-
-func TestReserveOneVolume(t *testing.T) {
- topo := setup(topologyLayout)
- rand.Seed(time.Now().UnixNano())
- vg := &VolumeGrowth{copy1factor: 3, copy2factor: 2, copy3factor: 1, copyAll: 4}
- if c, e := vg.GrowByCountAndType(1, storage.Copy000, topo); e == nil {
- t.Log("reserved", c)
- }
-}
diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go
index c836a87fb..89773b341 100644
--- a/go/storage/needle_map.go
+++ b/go/storage/needle_map.go
@@ -52,37 +52,61 @@ const (
func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
nm := NewNeedleMap(file)
- bufferReader := bufio.NewReaderSize(nm.indexFile, 1024*1024)
- bytes := make([]byte, 16*RowsToRead)
- count, e := bufferReader.Read(bytes)
- for count > 0 && e == nil {
- for i := 0; i < count; i += 16 {
- key := util.BytesToUint64(bytes[i : i+8])
- offset := util.BytesToUint32(bytes[i+8 : i+12])
- size := util.BytesToUint32(bytes[i+12 : i+16])
- nm.FileCounter++
- nm.FileByteCounter = nm.FileByteCounter + uint64(size)
- if offset > 0 {
- oldSize := nm.m.Set(Key(key), offset, size)
- //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize)
- if oldSize > 0 {
- nm.DeletionCounter++
- nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
- }
- } else {
- oldSize := nm.m.Delete(Key(key))
- //log.Println("removing key", key, "offset", offset, "size", size, "oldSize", oldSize)
+ e := walkIndexFile(file, func(key uint64, offset, size uint32) error {
+ nm.FileCounter++
+ nm.FileByteCounter = nm.FileByteCounter + uint64(size)
+ if offset > 0 {
+ oldSize := nm.m.Set(Key(key), offset, size)
+ //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize)
+ if oldSize > 0 {
nm.DeletionCounter++
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
}
+ } else {
+ oldSize := nm.m.Delete(Key(key))
+ //log.Println("removing key", key, "offset", offset, "size", size, "oldSize", oldSize)
+ nm.DeletionCounter++
+ nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
}
+ return nil
+ })
+ return nm, e
+}
+
+// walks through the index file, calls fn function with each key, offset, size
+// stops with the error returned by the fn function
+func walkIndexFile(r io.Reader, fn func(key uint64, offset, size uint32) error) error {
+ br := bufio.NewReaderSize(r, 1024*1024)
+ bytes := make([]byte, 16*RowsToRead)
+ count, e := br.Read(bytes)
+ var (
+ key uint64
+ offset, size uint32
+ i int
+ )
- count, e = bufferReader.Read(bytes)
+ for count > 0 && e == nil {
+ for i = 0; i+16 <= count; i += 16 {
+ key = util.BytesToUint64(bytes[i : i+8])
+ offset = util.BytesToUint32(bytes[i+8 : i+12])
+ size = util.BytesToUint32(bytes[i+12 : i+16])
+ if e = fn(key, offset, size); e != nil {
+ return e
+ }
+ }
+ if count%16 != 0 {
+ copy(bytes[:count-i], bytes[i:count])
+ i = count - i
+ count, e = br.Read(bytes[i:])
+ count += i
+ } else {
+ count, e = br.Read(bytes)
+ }
}
if e == io.EOF {
- e = nil
+ return nil
}
- return nm, e
+ return e
}
func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
diff --git a/go/storage/store.go b/go/storage/store.go
index 954bae0ae..0889c9330 100644
--- a/go/storage/store.go
+++ b/go/storage/store.go
@@ -20,6 +20,8 @@ type Store struct {
MaxVolumeCount int
masterNode string
+ dataCenter string //optional informaton, overwriting master setting if exists
+ rack string //optional information, overwriting master setting if exists
connected bool
volumeSizeLimit uint64 //read from the master
@@ -99,6 +101,16 @@ func (s *Store) CommitCompactVolume(volumeIdString string) error {
}
return s.volumes[vid].commitCompact()
}
+func (s *Store) FreezeVolume(volumeIdString string) error {
+ vid, err := NewVolumeId(volumeIdString)
+ if err != nil {
+ return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!")
+ }
+ if s.volumes[vid].readOnly {
+ return errors.New("Volume " + volumeIdString + " is already read-only")
+ }
+ return s.volumes[vid].freeze()
+}
func (s *Store) loadExistingVolumes() {
if dirs, err := ioutil.ReadDir(s.dir); err == nil {
for _, dir := range dirs {
@@ -138,6 +150,12 @@ type JoinResult struct {
func (s *Store) SetMaster(mserver string) {
s.masterNode = mserver
}
+func (s *Store) SetDataCenter(dataCenter string) {
+ s.dataCenter = dataCenter
+}
+func (s *Store) SetRack(rack string) {
+ s.rack = rack
+}
func (s *Store) Join() error {
stats := new([]*VolumeInfo)
for k, v := range s.volumes {
@@ -159,6 +177,8 @@ func (s *Store) Join() error {
values.Add("publicUrl", s.PublicUrl)
values.Add("volumes", string(bytes))
values.Add("maxVolumeCount", strconv.Itoa(s.MaxVolumeCount))
+ values.Add("dataCenter", s.dataCenter)
+ values.Add("rack", s.rack)
jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values)
if err != nil {
return err
diff --git a/go/storage/volume.go b/go/storage/volume.go
index 98f712433..4e6db3634 100644
--- a/go/storage/volume.go
+++ b/go/storage/volume.go
@@ -70,10 +70,29 @@ func (v *Volume) load(alsoLoadIndex bool) error {
e = v.maybeWriteSuperBlock()
}
if e == nil && alsoLoadIndex {
- indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
- if ie != nil {
- return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
- }
+ var indexFile *os.File
+ if v.readOnly {
+ if indexFile, e = os.Open(fileName + ".idx"); e != nil && !os.IsNotExist(e) {
+ return fmt.Errorf("cannot open index file %s.idx: %s", fileName, e)
+ }
+ if indexFile != nil {
+ log.Printf("converting %s.idx to %s.cdb", fileName, fileName)
+ if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil {
+ log.Printf("error converting %s.idx to %s.cdb: %s", fileName, fileName)
+ } else {
+ indexFile.Close()
+ os.Remove(indexFile.Name())
+ indexFile = nil
+ }
+ }
+ v.nm, e = OpenCdbMap(fileName + ".cdb")
+ return e
+ } else {
+ indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
+ if e != nil {
+ return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
+ }
+ }
v.nm, e = LoadNeedleMap(indexFile)
}
return e
@@ -224,6 +243,31 @@ func (v *Volume) commitCompact() error {
}
return nil
}
+func (v *Volume) freeze() error {
+ if v.readOnly {
+ return nil
+ }
+ nm, ok := v.nm.(*NeedleMap)
+ if !ok {
+ return nil
+ }
+ v.accessLock.Lock()
+ defer v.accessLock.Unlock()
+ bn, _ := nakeFilename(v.dataFile.Name())
+ cdbFn := bn + ".cdb"
+ log.Printf("converting %s to %s", nm.indexFile.Name(), cdbFn)
+ err := DumpNeedleMapToCdb(cdbFn, nm)
+ if err != nil {
+ return err
+ }
+ if v.nm, err = OpenCdbMap(cdbFn); err != nil {
+ return err
+ }
+ nm.indexFile.Close()
+ os.Remove(nm.indexFile.Name())
+ v.readOnly = true
+ return nil
+}
func ScanVolumeFile(dirname string, id VolumeId,
visitSuperBlock func(SuperBlock) error,
diff --git a/go/topology/configuration.go b/go/topology/configuration.go
index 4c8424214..058600a7c 100644
--- a/go/topology/configuration.go
+++ b/go/topology/configuration.go
@@ -46,11 +46,20 @@ func (c *Configuration) String() string {
return ""
}
-func (c *Configuration) Locate(ip string) (dc string, rack string) {
- if c != nil && c.ip2location != nil {
- if loc, ok := c.ip2location[ip]; ok {
- return loc.dcName, loc.rackName
+func (c *Configuration) Locate(ip string, dcName string, rackName string) (dc string, rack string) {
+ if dcName == "" {
+ if c != nil && c.ip2location != nil {
+ if loc, ok := c.ip2location[ip]; ok {
+ return loc.dcName, loc.rackName
+ }
+ }
+ } else {
+ if rackName == "" {
+ return dcName, "DefaultRack"
+ } else {
+ return dcName, rackName
}
}
+
return "DefaultDataCenter", "DefaultRack"
}
diff --git a/go/topology/data_node.go b/go/topology/data_node.go
index ea4ea5d39..3a6edb447 100644
--- a/go/topology/data_node.go
+++ b/go/topology/data_node.go
@@ -34,8 +34,11 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
dn.volumes[v.Id] = v
}
}
+func (dn *DataNode) GetDataCenter() *DataCenter {
+ return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter)
+}
func (dn *DataNode) GetTopology() *Topology {
- p := dn.parent
+ p := dn.Parent()
for p.Parent() != nil {
p = p.Parent()
}
diff --git a/go/topology/node.go b/go/topology/node.go
index 786f76702..d61f01244 100644
--- a/go/topology/node.go
+++ b/go/topology/node.go
@@ -10,7 +10,7 @@ type Node interface {
Id() NodeId
String() string
FreeSpace() int
- ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode)
+ ReserveOneVolume(r int, vid storage.VolumeId, dataCenter string) (bool, *DataNode)
UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int)
UpAdjustVolumeCountDelta(volumeCountDelta int)
UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int)
@@ -26,6 +26,8 @@ type Node interface {
CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64)
IsDataNode() bool
+ IsRack() bool
+ IsDataCenter() bool
Children() map[NodeId]Node
Parent() Node
@@ -78,23 +80,26 @@ func (n *NodeImpl) Parent() Node {
func (n *NodeImpl) GetValue() interface{} {
return n.value
}
-func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) {
+func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId, dataCenter string) (bool, *DataNode) {
ret := false
var assignedNode *DataNode
for _, node := range n.children {
freeSpace := node.FreeSpace()
- //fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
+ // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
if freeSpace <= 0 {
continue
}
+ if dataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(dataCenter) {
+ continue
+ }
if r >= freeSpace {
r -= freeSpace
} else {
if node.IsDataNode() && node.FreeSpace() > 0 {
- //fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
+ // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
return true, node.(*DataNode)
}
- ret, assignedNode = node.ReserveOneVolume(r, vid)
+ ret, assignedNode = node.ReserveOneVolume(r, vid, dataCenter)
if ret {
break
}
diff --git a/go/topology/node_list.go b/go/topology/node_list.go
index db7723714..2be90b123 100644
--- a/go/topology/node_list.go
+++ b/go/topology/node_list.go
@@ -30,23 +30,37 @@ func (nl *NodeList) FreeSpace() int {
return freeSpace
}
-func (nl *NodeList) RandomlyPickN(n int, min int) ([]Node, bool) {
+func (nl *NodeList) RandomlyPickN(count int, minSpace int, firstNodeName string) ([]Node, bool) {
var list []Node
+ var preferredNode *Node
+ if firstNodeName != "" {
+ for _, n := range nl.nodes {
+ if n.Id() == NodeId(firstNodeName) && n.FreeSpace() >= minSpace {
+ preferredNode = &n
+ break
+ }
+ }
+ if preferredNode == nil {
+ return list, false
+ }
+ }
+
for _, n := range nl.nodes {
- if n.FreeSpace() >= min {
+ if n.FreeSpace() >= minSpace && n.Id() != NodeId(firstNodeName) {
list = append(list, n)
}
}
- if n > len(list) {
+ if count > len(list) || count == len(list) && firstNodeName != "" {
return nil, false
}
- for i := n; i > 0; i-- {
+ for i := len(list); i > 0; i-- {
r := rand.Intn(i)
- t := list[r]
- list[r] = list[i-1]
- list[i-1] = t
+ list[r], list[i-1] = list[i-1], list[r]
+ }
+ if firstNodeName != "" {
+ list[0] = *preferredNode
}
- return list[len(list)-n:], true
+ return list[:count], true
}
func (nl *NodeList) ReserveOneVolume(randomVolumeIndex int, vid storage.VolumeId) (bool, *DataNode) {
diff --git a/go/topology/node_list_test.go b/go/topology/node_list_test.go
index c6e530724..0037cbaa9 100644
--- a/go/topology/node_list_test.go
+++ b/go/topology/node_list_test.go
@@ -20,22 +20,38 @@ func TestXYZ(t *testing.T) {
}
nl := NewNodeList(topo.Children(), nil)
- picked, ret := nl.RandomlyPickN(1, 0)
+ picked, ret := nl.RandomlyPickN(1, 0, "")
if !ret || len(picked) != 1 {
t.Error("need to randomly pick 1 node")
}
- picked, ret = nl.RandomlyPickN(4, 0)
+ picked, ret = nl.RandomlyPickN(1, 0, "dc1")
+ if !ret || len(picked) != 1 {
+ t.Error("need to randomly pick 1 node")
+ }
+ if picked[0].Id() != "dc1" {
+ t.Error("need to randomly pick 1 dc1 node")
+ }
+
+ picked, ret = nl.RandomlyPickN(2, 0, "dc1")
+ if !ret || len(picked) != 2 {
+ t.Error("need to randomly pick 1 node")
+ }
+ if picked[0].Id() != "dc1" {
+ t.Error("need to randomly pick 2 with one dc1 node")
+ }
+
+ picked, ret = nl.RandomlyPickN(4, 0, "")
if !ret || len(picked) != 4 {
t.Error("need to randomly pick 4 nodes")
}
- picked, ret = nl.RandomlyPickN(5, 0)
+ picked, ret = nl.RandomlyPickN(5, 0, "")
if !ret || len(picked) != 5 {
t.Error("need to randomly pick 5 nodes")
}
- picked, ret = nl.RandomlyPickN(6, 0)
+ picked, ret = nl.RandomlyPickN(6, 0, "")
if ret || len(picked) != 0 {
t.Error("can not randomly pick 6 nodes:", ret, picked)
}
diff --git a/go/topology/topo_test.go b/go/topology/topo_test.go
index 99e570821..d5ea08086 100644
--- a/go/topology/topo_test.go
+++ b/go/topology/topo_test.go
@@ -127,7 +127,10 @@ func TestReserveOneVolume(t *testing.T) {
topo := setup(topologyLayout)
rand.Seed(time.Now().UnixNano())
rand.Seed(1)
- ret, node, vid := topo.RandomlyReserveOneVolume()
+ ret, node, vid := topo.RandomlyReserveOneVolume("dc1")
+ if node.Parent().Parent().Id() != NodeId("dc1") {
+ t.Fail()
+ }
fmt.Println("assigned :", ret, ", node :", node, ", volume id:", vid)
}
diff --git a/go/topology/topology.go b/go/topology/topology.go
index 5dcc56204..e488319d1 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -4,6 +4,7 @@ import (
"code.google.com/p/weed-fs/go/sequence"
"code.google.com/p/weed-fs/go/storage"
"errors"
+ "fmt"
"io/ioutil"
"log"
"math/rand"
@@ -71,25 +72,13 @@ func (t *Topology) Lookup(vid storage.VolumeId) []*DataNode {
return nil
}
-func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, *storage.VolumeId) {
+func (t *Topology) RandomlyReserveOneVolume(dataCenter string) (bool, *DataNode, *storage.VolumeId) {
if t.FreeSpace() <= 0 {
+ fmt.Println("Topology does not have free space left!")
return false, nil, nil
}
vid := t.NextVolumeId()
- ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid)
- return ret, node, &vid
-}
-
-func (t *Topology) RandomlyReserveOneVolumeExcept(except []Node) (bool, *DataNode, *storage.VolumeId) {
- freeSpace := t.FreeSpace()
- for _, node := range except {
- freeSpace -= node.FreeSpace()
- }
- if freeSpace <= 0 {
- return false, nil, nil
- }
- vid := t.NextVolumeId()
- ret, node := t.ReserveOneVolume(rand.Intn(freeSpace), vid)
+ ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid, dataCenter)
return ret, node, &vid
}
@@ -98,12 +87,12 @@ func (t *Topology) NextVolumeId() storage.VolumeId {
return vid.Next()
}
-func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (string, int, *DataNode, error) {
+func (t *Topology) PickForWrite(repType storage.ReplicationType, count int, dataCenter string) (string, int, *DataNode, error) {
replicationTypeIndex := repType.GetReplicationLevelIndex()
if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
}
- vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count)
+ vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count, dataCenter)
if err != nil || datanodes.Length() == 0 {
return "", 0, nil, errors.New("No writable volumes avalable!")
}
@@ -114,6 +103,7 @@ func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (str
func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout {
replicationTypeIndex := repType.GetReplicationLevelIndex()
if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
+ fmt.Println("adding replication type", repType)
t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
}
return t.replicaType2VolumeLayout[replicationTypeIndex]
@@ -123,8 +113,8 @@ func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn)
}
-func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int) {
- dcName, rackName := t.configuration.Locate(ip)
+func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, dcName string, rackName string) {
+ dcName, rackName = t.configuration.Locate(ip, dcName, rackName)
dc := t.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
dn := rack.FindDataNode(ip, port)
diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go
index cd725c132..d8ed49b0b 100644
--- a/go/topology/volume_layout.go
+++ b/go/topology/volume_layout.go
@@ -51,22 +51,52 @@ func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
return nil
}
-func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *VolumeLocationList, error) {
+func (vl *VolumeLayout) PickForWrite(count int, dataCenter string) (*storage.VolumeId, int, *VolumeLocationList, error) {
len_writers := len(vl.writables)
if len_writers <= 0 {
fmt.Println("No more writable volumes!")
return nil, 0, nil, errors.New("No more writable volumes!")
}
- vid := vl.writables[rand.Intn(len_writers)]
- locationList := vl.vid2location[vid]
- if locationList != nil {
+ if dataCenter == "" {
+ vid := vl.writables[rand.Intn(len_writers)]
+ locationList := vl.vid2location[vid]
+ if locationList != nil {
+ return &vid, count, locationList, nil
+ }
+ return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!")
+ } else {
+ var vid storage.VolumeId
+ var locationList *VolumeLocationList
+ counter := 0
+ for _, v := range vl.writables {
+ volumeLocationList := vl.vid2location[v]
+ for _, dn := range volumeLocationList.list {
+ if dn.GetDataCenter().Id() == NodeId(dataCenter) {
+ counter++
+ if rand.Intn(counter) < 1 {
+ vid, locationList = v, volumeLocationList
+ }
+ }
+ }
+ }
return &vid, count, locationList, nil
}
- return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!")
+ return nil, 0, nil, errors.New("Strangely This Should Never Have Happened!")
}
-func (vl *VolumeLayout) GetActiveVolumeCount() int {
- return len(vl.writables)
+func (vl *VolumeLayout) GetActiveVolumeCount(dataCenter string) int {
+ if dataCenter == "" {
+ return len(vl.writables)
+ }
+ counter := 0
+ for _, v := range vl.writables {
+ for _, dn := range vl.vid2location[v].list {
+ if dn.GetDataCenter().Id() == NodeId(dataCenter) {
+ counter++
+ }
+ }
+ }
+ return counter
}
func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool {
diff --git a/go/weed/master.go b/go/weed/master.go
index f6cc88df0..7da7831bf 100644
--- a/go/weed/master.go
+++ b/go/weed/master.go
@@ -77,25 +77,27 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
if repType == "" {
repType = *defaultRepType
}
+ dataCenter := r.FormValue("dataCenter")
rt, err := storage.NewReplicationTypeFromString(repType)
if err != nil {
w.WriteHeader(http.StatusNotAcceptable)
writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
return
}
- if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 {
+
+ if topo.GetVolumeLayout(rt).GetActiveVolumeCount(dataCenter) <= 0 {
if topo.FreeSpace() <= 0 {
w.WriteHeader(http.StatusNotFound)
writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"})
return
} else {
- if _, err = vg.GrowByType(rt, topo); err != nil {
+ if _, err = vg.GrowByType(rt, dataCenter, topo); err != nil {
writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()})
return
}
}
}
- fid, count, dn, err := topo.PickForWrite(rt, c)
+ fid, count, dn, err := topo.PickForWrite(rt, c, dataCenter)
if err == nil {
writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count})
} else {
@@ -120,7 +122,7 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
return
}
debug(s, "volumes", r.FormValue("volumes"))
- topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount)
+ topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, r.FormValue("dataCenter"), r.FormValue("rack"))
m := make(map[string]interface{})
m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB) * 1024 * 1024
writeJsonQuiet(w, r, m)
@@ -151,7 +153,7 @@ func volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
if topo.FreeSpace() < count*rt.GetCopyCount() {
err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount()))
} else {
- count, err = vg.GrowByCountAndType(count, rt, topo)
+ count, err = vg.GrowByCountAndType(count, rt, r.FormValue("dataCneter"), topo)
}
} else {
err = errors.New("parameter count is not found")
diff --git a/go/weed/volume.go b/go/weed/volume.go
index 33121388e..6cbbceaef 100644
--- a/go/weed/volume.go
+++ b/go/weed/volume.go
@@ -2,7 +2,7 @@ package main
import (
"code.google.com/p/weed-fs/go/operation"
- "code.google.com/p/weed-fs/go/replication"
+ "code.google.com/p/weed-fs/go/replication"
"code.google.com/p/weed-fs/go/storage"
"log"
"math/rand"
@@ -38,6 +38,8 @@ var (
maxVolumeCount = cmdVolume.Flag.Int("max", 5, "maximum number of volumes")
vReadTimeout = cmdVolume.Flag.Int("readTimeout", 3, "connection read timeout in seconds")
vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
+ dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
+ rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
store *storage.Store
)
@@ -86,6 +88,16 @@ func vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) {
}
debug("commit compact volume =", r.FormValue("volume"), ", error =", err)
}
+func freezeVolumeHandler(w http.ResponseWriter, r *http.Request) {
+ //TODO: notify master that this volume will be read-only
+ err := store.FreezeVolume(r.FormValue("volume"))
+ if err == nil {
+ writeJsonQuiet(w, r, map[string]interface{}{"error": ""})
+ } else {
+ writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
+ }
+ debug("freeze volume =", r.FormValue("volume"), ", error =", err)
+}
func storeHandler(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
@@ -289,10 +301,13 @@ func runVolume(cmd *Command, args []string) bool {
http.HandleFunc("/admin/vacuum_volume_check", vacuumVolumeCheckHandler)
http.HandleFunc("/admin/vacuum_volume_compact", vacuumVolumeCompactHandler)
http.HandleFunc("/admin/vacuum_volume_commit", vacuumVolumeCommitHandler)
+ http.HandleFunc("/admin/freeze_volume", freezeVolumeHandler)
go func() {
connected := true
store.SetMaster(*masterNode)
+ store.SetDataCenter(*dataCenter)
+ store.SetRack(*rack)
for {
err := store.Join()
if err == nil {