aboutsummaryrefslogtreecommitdiff
path: root/weed/topology
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2013-02-10 03:49:51 -0800
committerChris Lu <chris.lu@gmail.com>2013-02-10 03:49:51 -0800
commit5071f528f649f3f99336c7d491ceef4859e34744 (patch)
tree0c4bc8a286597cd79e22b1ce02cd9cd3b1c44602 /weed/topology
parent55f2627fcf965c3765ad9b63878e9a22a59f4b55 (diff)
downloadseaweedfs-5071f528f649f3f99336c7d491ceef4859e34744.tar.xz
seaweedfs-5071f528f649f3f99336c7d491ceef4859e34744.zip
testing compilation with remove package
Diffstat (limited to 'weed/topology')
-rw-r--r--weed/topology/configuration.go56
-rw-r--r--weed/topology/configuration_test.go42
-rw-r--r--weed/topology/data_center.go41
-rw-r--r--weed/topology/data_node.go60
-rw-r--r--weed/topology/node.go200
-rw-r--r--weed/topology/node_list.go69
-rw-r--r--weed/topology/node_list_test.go39
-rw-r--r--weed/topology/rack.go64
-rw-r--r--weed/topology/topo_test.go127
-rw-r--r--weed/topology/topology.go148
-rw-r--r--weed/topology/topology_compact.go150
-rw-r--r--weed/topology/topology_event_handling.go67
-rw-r--r--weed/topology/topology_map.go50
-rw-r--r--weed/topology/volume_layout.go116
-rw-r--r--weed/topology/volume_location.go58
15 files changed, 0 insertions, 1287 deletions
diff --git a/weed/topology/configuration.go b/weed/topology/configuration.go
deleted file mode 100644
index 4c8424214..000000000
--- a/weed/topology/configuration.go
+++ /dev/null
@@ -1,56 +0,0 @@
-package topology
-
-import (
- "encoding/xml"
-)
-
-type loc struct {
- dcName string
- rackName string
-}
-type rack struct {
- Name string `xml:"name,attr"`
- Ips []string `xml:"Ip"`
-}
-type dataCenter struct {
- Name string `xml:"name,attr"`
- Racks []rack `xml:"Rack"`
-}
-type topology struct {
- DataCenters []dataCenter `xml:"DataCenter"`
-}
-type Configuration struct {
- XMLName xml.Name `xml:"Configuration"`
- Topo topology `xml:"Topology"`
- ip2location map[string]loc
-}
-
-func NewConfiguration(b []byte) (*Configuration, error) {
- c := &Configuration{}
- err := xml.Unmarshal(b, c)
- c.ip2location = make(map[string]loc)
- for _, dc := range c.Topo.DataCenters {
- for _, rack := range dc.Racks {
- for _, ip := range rack.Ips {
- c.ip2location[ip] = loc{dcName: dc.Name, rackName: rack.Name}
- }
- }
- }
- return c, err
-}
-
-func (c *Configuration) String() string {
- if b, e := xml.MarshalIndent(c, " ", " "); e == nil {
- return string(b)
- }
- return ""
-}
-
-func (c *Configuration) Locate(ip string) (dc string, rack string) {
- if c != nil && c.ip2location != nil {
- if loc, ok := c.ip2location[ip]; ok {
- return loc.dcName, loc.rackName
- }
- }
- return "DefaultDataCenter", "DefaultRack"
-}
diff --git a/weed/topology/configuration_test.go b/weed/topology/configuration_test.go
deleted file mode 100644
index 35d82c058..000000000
--- a/weed/topology/configuration_test.go
+++ /dev/null
@@ -1,42 +0,0 @@
-package topology
-
-import (
- "fmt"
- "testing"
-)
-
-func TestLoadConfiguration(t *testing.T) {
-
- confContent := `
-
-<?xml version="1.0" encoding="UTF-8" ?>
-<Configuration>
- <Topology>
- <DataCenter name="dc1">
- <Rack name="rack1">
- <Ip>192.168.1.1</Ip>
- </Rack>
- </DataCenter>
- <DataCenter name="dc2">
- <Rack name="rack1">
- <Ip>192.168.1.2</Ip>
- </Rack>
- <Rack name="rack2">
- <Ip>192.168.1.3</Ip>
- <Ip>192.168.1.4</Ip>
- </Rack>
- </DataCenter>
- </Topology>
-</Configuration>
-`
- c, err := NewConfiguration([]byte(confContent))
-
- fmt.Printf("%s\n", c)
- if err != nil {
- t.Fatalf("unmarshal error:%s", err.Error())
- }
-
- if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" {
- t.Fatalf("unmarshal error:%s", c)
- }
-}
diff --git a/weed/topology/data_center.go b/weed/topology/data_center.go
deleted file mode 100644
index a3b2b7d13..000000000
--- a/weed/topology/data_center.go
+++ /dev/null
@@ -1,41 +0,0 @@
-package topology
-
-import ()
-
-type DataCenter struct {
- NodeImpl
-}
-
-func NewDataCenter(id string) *DataCenter {
- dc := &DataCenter{}
- dc.id = NodeId(id)
- dc.nodeType = "DataCenter"
- dc.children = make(map[NodeId]Node)
- dc.NodeImpl.value = dc
- return dc
-}
-
-func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack {
- for _, c := range dc.Children() {
- rack := c.(*Rack)
- if string(rack.Id()) == rackName {
- return rack
- }
- }
- rack := NewRack(rackName)
- dc.LinkChildNode(rack)
- return rack
-}
-
-func (dc *DataCenter) ToMap() interface{} {
- m := make(map[string]interface{})
- m["Max"] = dc.GetMaxVolumeCount()
- m["Free"] = dc.FreeSpace()
- var racks []interface{}
- for _, c := range dc.Children() {
- rack := c.(*Rack)
- racks = append(racks, rack.ToMap())
- }
- m["Racks"] = racks
- return m
-}
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
deleted file mode 100644
index dbb634af2..000000000
--- a/weed/topology/data_node.go
+++ /dev/null
@@ -1,60 +0,0 @@
-package topology
-
-import (
- _ "fmt"
- "code.google.com/p/weed-fs/weed/storage"
- "strconv"
-)
-
-type DataNode struct {
- NodeImpl
- volumes map[storage.VolumeId]storage.VolumeInfo
- Ip string
- Port int
- PublicUrl string
- LastSeen int64 // unix time in seconds
- Dead bool
-}
-
-func NewDataNode(id string) *DataNode {
- s := &DataNode{}
- s.id = NodeId(id)
- s.nodeType = "DataNode"
- s.volumes = make(map[storage.VolumeId]storage.VolumeInfo)
- s.NodeImpl.value = s
- return s
-}
-func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
- if _, ok := dn.volumes[v.Id]; !ok {
- dn.volumes[v.Id] = v
- dn.UpAdjustVolumeCountDelta(1)
- dn.UpAdjustActiveVolumeCountDelta(1)
- dn.UpAdjustMaxVolumeId(v.Id)
- } else {
- dn.volumes[v.Id] = v
- }
-}
-func (dn *DataNode) GetTopology() *Topology {
- p := dn.parent
- for p.Parent() != nil {
- p = p.Parent()
- }
- t := p.(*Topology)
- return t
-}
-func (dn *DataNode) MatchLocation(ip string, port int) bool {
- return dn.Ip == ip && dn.Port == port
-}
-func (dn *DataNode) Url() string {
- return dn.Ip + ":" + strconv.Itoa(dn.Port)
-}
-
-func (dn *DataNode) ToMap() interface{} {
- ret := make(map[string]interface{})
- ret["Url"] = dn.Url()
- ret["Volumes"] = dn.GetVolumeCount()
- ret["Max"] = dn.GetMaxVolumeCount()
- ret["Free"] = dn.FreeSpace()
- ret["PublicUrl"] = dn.PublicUrl
- return ret
-}
diff --git a/weed/topology/node.go b/weed/topology/node.go
deleted file mode 100644
index fe69c57c0..000000000
--- a/weed/topology/node.go
+++ /dev/null
@@ -1,200 +0,0 @@
-package topology
-
-import (
- "fmt"
- "code.google.com/p/weed-fs/weed/storage"
-)
-
-type NodeId string
-type Node interface {
- Id() NodeId
- String() string
- FreeSpace() int
- ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode)
- UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int)
- UpAdjustVolumeCountDelta(volumeCountDelta int)
- UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int)
- UpAdjustMaxVolumeId(vid storage.VolumeId)
-
- GetVolumeCount() int
- GetActiveVolumeCount() int
- GetMaxVolumeCount() int
- GetMaxVolumeId() storage.VolumeId
- SetParent(Node)
- LinkChildNode(node Node)
- UnlinkChildNode(nodeId NodeId)
- CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64)
-
- IsDataNode() bool
- Children() map[NodeId]Node
- Parent() Node
-
- GetValue() interface{} //get reference to the topology,dc,rack,datanode
-}
-type NodeImpl struct {
- id NodeId
- volumeCount int
- activeVolumeCount int
- maxVolumeCount int
- parent Node
- children map[NodeId]Node
- maxVolumeId storage.VolumeId
-
- //for rack, data center, topology
- nodeType string
- value interface{}
-}
-
-func (n *NodeImpl) IsDataNode() bool {
- return n.nodeType == "DataNode"
-}
-func (n *NodeImpl) IsRack() bool {
- return n.nodeType == "Rack"
-}
-func (n *NodeImpl) IsDataCenter() bool {
- return n.nodeType == "DataCenter"
-}
-func (n *NodeImpl) String() string {
- if n.parent != nil {
- return n.parent.String() + ":" + string(n.id)
- }
- return string(n.id)
-}
-func (n *NodeImpl) Id() NodeId {
- return n.id
-}
-func (n *NodeImpl) FreeSpace() int {
- return n.maxVolumeCount - n.volumeCount
-}
-func (n *NodeImpl) SetParent(node Node) {
- n.parent = node
-}
-func (n *NodeImpl) Children() map[NodeId]Node {
- return n.children
-}
-func (n *NodeImpl) Parent() Node {
- return n.parent
-}
-func (n *NodeImpl) GetValue() interface{} {
- return n.value
-}
-func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) {
- ret := false
- var assignedNode *DataNode
- for _, node := range n.children {
- freeSpace := node.FreeSpace()
- //fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
- if freeSpace <= 0 {
- continue
- }
- if r >= freeSpace {
- r -= freeSpace
- } else {
- if node.IsDataNode() && node.FreeSpace() > 0 {
- //fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
- return true, node.(*DataNode)
- }
- ret, assignedNode = node.ReserveOneVolume(r, vid)
- if ret {
- break
- }
- }
- }
- return ret, assignedNode
-}
-
-func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative
- n.maxVolumeCount += maxVolumeCountDelta
- if n.parent != nil {
- n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta)
- }
-}
-func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative
- n.volumeCount += volumeCountDelta
- if n.parent != nil {
- n.parent.UpAdjustVolumeCountDelta(volumeCountDelta)
- }
-}
-func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative
- n.activeVolumeCount += activeVolumeCountDelta
- if n.parent != nil {
- n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta)
- }
-}
-func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative
- if n.maxVolumeId < vid {
- n.maxVolumeId = vid
- if n.parent != nil {
- n.parent.UpAdjustMaxVolumeId(vid)
- }
- }
-}
-func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId {
- return n.maxVolumeId
-}
-func (n *NodeImpl) GetVolumeCount() int {
- return n.volumeCount
-}
-func (n *NodeImpl) GetActiveVolumeCount() int {
- return n.activeVolumeCount
-}
-func (n *NodeImpl) GetMaxVolumeCount() int {
- return n.maxVolumeCount
-}
-
-func (n *NodeImpl) LinkChildNode(node Node) {
- if n.children[node.Id()] == nil {
- n.children[node.Id()] = node
- n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
- n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
- n.UpAdjustVolumeCountDelta(node.GetVolumeCount())
- n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount())
- node.SetParent(n)
- fmt.Println(n, "adds child", node.Id())
- }
-}
-
-func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
- node := n.children[nodeId]
- node.SetParent(nil)
- if node != nil {
- delete(n.children, node.Id())
- n.UpAdjustVolumeCountDelta(-node.GetVolumeCount())
- n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
- n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount())
- fmt.Println(n, "removes", node, "volumeCount =", n.activeVolumeCount)
- }
-}
-
-func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) {
- if n.IsRack() {
- for _, c := range n.Children() {
- dn := c.(*DataNode) //can not cast n to DataNode
- if dn.LastSeen < freshThreshHold {
- if !dn.Dead {
- dn.Dead = true
- n.GetTopology().chanDeadDataNodes <- dn
- }
- }
- for _, v := range dn.volumes {
- if uint64(v.Size) >= volumeSizeLimit {
- //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
- n.GetTopology().chanFullVolumes <- v
- }
- }
- }
- } else {
- for _, c := range n.Children() {
- c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit)
- }
- }
-}
-
-func (n *NodeImpl) GetTopology() *Topology {
- var p Node
- p = n
- for p.Parent() != nil {
- p = p.Parent()
- }
- return p.GetValue().(*Topology)
-}
diff --git a/weed/topology/node_list.go b/weed/topology/node_list.go
deleted file mode 100644
index 597d39b93..000000000
--- a/weed/topology/node_list.go
+++ /dev/null
@@ -1,69 +0,0 @@
-package topology
-
-import (
- "fmt"
- "math/rand"
- "code.google.com/p/weed-fs/weed/storage"
-)
-
-type NodeList struct {
- nodes map[NodeId]Node
- except map[string]Node
-}
-
-func NewNodeList(nodes map[NodeId]Node, except map[string]Node) *NodeList {
- m := make(map[NodeId]Node, len(nodes)-len(except))
- for _, n := range nodes {
- if except[n.String()] == nil {
- m[n.Id()] = n
- }
- }
- nl := &NodeList{nodes: m}
- return nl
-}
-
-func (nl *NodeList) FreeSpace() int {
- freeSpace := 0
- for _, n := range nl.nodes {
- freeSpace += n.FreeSpace()
- }
- return freeSpace
-}
-
-func (nl *NodeList) RandomlyPickN(n int, min int) ([]Node, bool) {
- var list []Node
- for _, n := range nl.nodes {
- if n.FreeSpace() >= min {
- list = append(list, n)
- }
- }
- if n > len(list) {
- return nil, false
- }
- for i := n; i > 0; i-- {
- r := rand.Intn(i)
- t := list[r]
- list[r] = list[i-1]
- list[i-1] = t
- }
- return list[len(list)-n:], true
-}
-
-func (nl *NodeList) ReserveOneVolume(randomVolumeIndex int, vid storage.VolumeId) (bool, *DataNode) {
- for _, node := range nl.nodes {
- freeSpace := node.FreeSpace()
- if randomVolumeIndex >= freeSpace {
- randomVolumeIndex -= freeSpace
- } else {
- if node.IsDataNode() && node.FreeSpace() > 0 {
- fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
- return true, node.(*DataNode)
- }
- children := node.Children()
- newNodeList := NewNodeList(children, nl.except)
- return newNodeList.ReserveOneVolume(randomVolumeIndex, vid)
- }
- }
- return false, nil
-
-}
diff --git a/weed/topology/node_list_test.go b/weed/topology/node_list_test.go
deleted file mode 100644
index 2fb4fa970..000000000
--- a/weed/topology/node_list_test.go
+++ /dev/null
@@ -1,39 +0,0 @@
-package topology
-
-import (
- _ "fmt"
- "strconv"
- "testing"
-)
-
-func TestXYZ(t *testing.T) {
- topo := NewTopology("topo", "/etc/weed.conf", "/tmp", "test", 234, 5)
- for i := 0; i < 5; i++ {
- dc := NewDataCenter("dc" + strconv.Itoa(i))
- dc.activeVolumeCount = i
- dc.maxVolumeCount = 5
- topo.LinkChildNode(dc)
- }
- nl := NewNodeList(topo.Children(), nil)
-
- picked, ret := nl.RandomlyPickN(1)
- if !ret || len(picked) != 1 {
- t.Errorf("need to randomly pick 1 node")
- }
-
- picked, ret = nl.RandomlyPickN(4)
- if !ret || len(picked) != 4 {
- t.Errorf("need to randomly pick 4 nodes")
- }
-
- picked, ret = nl.RandomlyPickN(5)
- if !ret || len(picked) != 5 {
- t.Errorf("need to randomly pick 5 nodes")
- }
-
- picked, ret = nl.RandomlyPickN(6)
- if ret || len(picked) != 0 {
- t.Errorf("can not randomly pick 6 nodes:", ret, picked)
- }
-
-}
diff --git a/weed/topology/rack.go b/weed/topology/rack.go
deleted file mode 100644
index acc34417a..000000000
--- a/weed/topology/rack.go
+++ /dev/null
@@ -1,64 +0,0 @@
-package topology
-
-import (
- "strconv"
- "time"
-)
-
-type Rack struct {
- NodeImpl
-}
-
-func NewRack(id string) *Rack {
- r := &Rack{}
- r.id = NodeId(id)
- r.nodeType = "Rack"
- r.children = make(map[NodeId]Node)
- r.NodeImpl.value = r
- return r
-}
-
-func (r *Rack) FindDataNode(ip string, port int) *DataNode {
- for _, c := range r.Children() {
- dn := c.(*DataNode)
- if dn.MatchLocation(ip, port) {
- return dn
- }
- }
- return nil
-}
-func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode {
- for _, c := range r.Children() {
- dn := c.(*DataNode)
- if dn.MatchLocation(ip, port) {
- dn.LastSeen = time.Now().Unix()
- if dn.Dead {
- dn.Dead = false
- r.GetTopology().chanRecoveredDataNodes <- dn
- dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount)
- }
- return dn
- }
- }
- dn := NewDataNode(ip + ":" + strconv.Itoa(port))
- dn.Ip = ip
- dn.Port = port
- dn.PublicUrl = publicUrl
- dn.maxVolumeCount = maxVolumeCount
- dn.LastSeen = time.Now().Unix()
- r.LinkChildNode(dn)
- return dn
-}
-
-func (rack *Rack) ToMap() interface{} {
- m := make(map[string]interface{})
- m["Max"] = rack.GetMaxVolumeCount()
- m["Free"] = rack.FreeSpace()
- var dns []interface{}
- for _, c := range rack.Children() {
- dn := c.(*DataNode)
- dns = append(dns, dn.ToMap())
- }
- m["DataNodes"] = dns
- return m
-}
diff --git a/weed/topology/topo_test.go b/weed/topology/topo_test.go
deleted file mode 100644
index 99db15b5c..000000000
--- a/weed/topology/topo_test.go
+++ /dev/null
@@ -1,127 +0,0 @@
-package topology
-
-import (
- "encoding/json"
- "fmt"
- "math/rand"
- "code.google.com/p/weed-fs/weed/storage"
- "testing"
- "time"
-)
-
-var topologyLayout = `
-{
- "dc1":{
- "rack1":{
- "server1":{
- "volumes":[
- {"id":1, "size":12312},
- {"id":2, "size":12312},
- {"id":3, "size":12312}
- ],
- "limit":3
- },
- "server2":{
- "volumes":[
- {"id":4, "size":12312},
- {"id":5, "size":12312},
- {"id":6, "size":12312}
- ],
- "limit":10
- }
- },
- "rack2":{
- "server1":{
- "volumes":[
- {"id":4, "size":12312},
- {"id":5, "size":12312},
- {"id":6, "size":12312}
- ],
- "limit":4
- },
- "server2":{
- "volumes":[],
- "limit":4
- },
- "server3":{
- "volumes":[
- {"id":2, "size":12312},
- {"id":3, "size":12312},
- {"id":4, "size":12312}
- ],
- "limit":2
- }
- }
- },
- "dc2":{
- },
- "dc3":{
- "rack2":{
- "server1":{
- "volumes":[
- {"id":1, "size":12312},
- {"id":3, "size":12312},
- {"id":5, "size":12312}
- ],
- "limit":4
- }
- }
- }
-}
-`
-
-func setup(topologyLayout string) *Topology {
- var data interface{}
- err := json.Unmarshal([]byte(topologyLayout), &data)
- if err != nil {
- fmt.Println("error:", err)
- }
-
- //need to connect all nodes first before server adding volumes
- topo := NewTopology("mynetwork", "/etc/weed.conf", "/tmp", "test", 234, 5)
- mTopology := data.(map[string]interface{})
- for dcKey, dcValue := range mTopology {
- dc := NewDataCenter(dcKey)
- dcMap := dcValue.(map[string]interface{})
- topo.LinkChildNode(dc)
- for rackKey, rackValue := range dcMap {
- rack := NewRack(rackKey)
- rackMap := rackValue.(map[string]interface{})
- dc.LinkChildNode(rack)
- for serverKey, serverValue := range rackMap {
- server := NewDataNode(serverKey)
- serverMap := serverValue.(map[string]interface{})
- rack.LinkChildNode(server)
- for _, v := range serverMap["volumes"].([]interface{}) {
- m := v.(map[string]interface{})
- vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion}
- server.AddOrUpdateVolume(vi)
- }
- server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
- }
- }
- }
-
- return topo
-}
-
-func TestRemoveDataCenter(t *testing.T) {
- topo := setup(topologyLayout)
- topo.UnlinkChildNode(NodeId("dc2"))
- if topo.GetActiveVolumeCount() != 15 {
- t.Fail()
- }
- topo.UnlinkChildNode(NodeId("dc3"))
- if topo.GetActiveVolumeCount() != 12 {
- t.Fail()
- }
-}
-
-func TestReserveOneVolume(t *testing.T) {
- topo := setup(topologyLayout)
- rand.Seed(time.Now().UnixNano())
- rand.Seed(1)
- ret, node, vid := topo.RandomlyReserveOneVolume()
- fmt.Println("assigned :", ret, ", node :", node, ", volume id:", vid)
-
-}
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
deleted file mode 100644
index ac5505a66..000000000
--- a/weed/topology/topology.go
+++ /dev/null
@@ -1,148 +0,0 @@
-package topology
-
-import (
- "errors"
- "io/ioutil"
- "math/rand"
- "code.google.com/p/weed-fs/weed/directory"
- "code.google.com/p/weed-fs/weed/sequence"
- "code.google.com/p/weed-fs/weed/storage"
-)
-
-type Topology struct {
- NodeImpl
-
- //transient vid~servers mapping for each replication type
- replicaType2VolumeLayout []*VolumeLayout
-
- pulse int64
-
- volumeSizeLimit uint64
-
- sequence sequence.Sequencer
-
- chanDeadDataNodes chan *DataNode
- chanRecoveredDataNodes chan *DataNode
- chanFullVolumes chan storage.VolumeInfo
-
- configuration *Configuration
-}
-
-func NewTopology(id string, confFile string, dirname string, sequenceFilename string, volumeSizeLimit uint64, pulse int) *Topology {
- t := &Topology{}
- t.id = NodeId(id)
- t.nodeType = "Topology"
- t.NodeImpl.value = t
- t.children = make(map[NodeId]Node)
- t.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType)
- t.pulse = int64(pulse)
- t.volumeSizeLimit = volumeSizeLimit
-
- t.sequence = sequence.NewSequencer(dirname, sequenceFilename)
-
- t.chanDeadDataNodes = make(chan *DataNode)
- t.chanRecoveredDataNodes = make(chan *DataNode)
- t.chanFullVolumes = make(chan storage.VolumeInfo)
-
- t.loadConfiguration(confFile)
-
- return t
-}
-
-func (t *Topology) loadConfiguration(configurationFile string) error {
- b, e := ioutil.ReadFile(configurationFile)
- if e == nil {
- t.configuration, e = NewConfiguration(b)
- }
- return e
-}
-
-func (t *Topology) Lookup(vid storage.VolumeId) []*DataNode {
- for _, vl := range t.replicaType2VolumeLayout {
- if vl != nil {
- if list := vl.Lookup(vid); list != nil {
- return list
- }
- }
- }
- return nil
-}
-
-func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, *storage.VolumeId) {
- if t.FreeSpace() <= 0 {
- return false, nil, nil
- }
- vid := t.NextVolumeId()
- ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid)
- return ret, node, &vid
-}
-
-func (t *Topology) RandomlyReserveOneVolumeExcept(except []Node) (bool, *DataNode, *storage.VolumeId) {
- freeSpace := t.FreeSpace()
- for _, node := range except {
- freeSpace -= node.FreeSpace()
- }
- if freeSpace <= 0 {
- return false, nil, nil
- }
- vid := t.NextVolumeId()
- ret, node := t.ReserveOneVolume(rand.Intn(freeSpace), vid)
- return ret, node, &vid
-}
-
-func (t *Topology) NextVolumeId() storage.VolumeId {
- vid := t.GetMaxVolumeId()
- return vid.Next()
-}
-
-func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (string, int, *DataNode, error) {
- replicationTypeIndex := repType.GetReplicationLevelIndex()
- if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
- t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
- }
- vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count)
- if err != nil {
- return "", 0, nil, errors.New("No writable volumes avalable!")
- }
- fileId, count := t.sequence.NextFileId(count)
- return directory.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
-}
-
-func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout {
- replicationTypeIndex := repType.GetReplicationLevelIndex()
- if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
- t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
- }
- return t.replicaType2VolumeLayout[replicationTypeIndex]
-}
-
-func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
- t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn)
-}
-
-func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int) {
- dcName, rackName := t.configuration.Locate(ip)
- dc := t.GetOrCreateDataCenter(dcName)
- rack := dc.GetOrCreateRack(rackName)
- dn := rack.FindDataNode(ip, port)
- if init && dn != nil {
- t.UnRegisterDataNode(dn)
- }
- dn = rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount)
- for _, v := range volumeInfos {
- dn.AddOrUpdateVolume(v)
- t.RegisterVolumeLayout(&v, dn)
- }
-}
-
-func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
- for _, c := range t.Children() {
- dc := c.(*DataCenter)
- if string(dc.Id()) == dcName {
- return dc
- }
- }
- dc := NewDataCenter(dcName)
- t.LinkChildNode(dc)
- return dc
-}
diff --git a/weed/topology/topology_compact.go b/weed/topology/topology_compact.go
deleted file mode 100644
index 980f72a6e..000000000
--- a/weed/topology/topology_compact.go
+++ /dev/null
@@ -1,150 +0,0 @@
-package topology
-
-import (
- "encoding/json"
- "errors"
- "fmt"
- "net/url"
- "code.google.com/p/weed-fs/weed/storage"
- "code.google.com/p/weed-fs/weed/util"
- "time"
-)
-
-func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool {
- ch := make(chan bool, locationlist.Length())
- for index, dn := range locationlist.list {
- go func(index int, url string, vid storage.VolumeId) {
- //fmt.Println(index, "Check vacuuming", vid, "on", dn.Url())
- if e, ret := vacuumVolume_Check(url, vid, garbageThreshold); e != nil {
- //fmt.Println(index, "Error when checking vacuuming", vid, "on", url, e)
- ch <- false
- } else {
- //fmt.Println(index, "Checked vacuuming", vid, "on", url, "needVacuum", ret)
- ch <- ret
- }
- }(index, dn.Url(), vid)
- }
- isCheckSuccess := true
- for _ = range locationlist.list {
- select {
- case canVacuum := <-ch:
- isCheckSuccess = isCheckSuccess && canVacuum
- case <-time.After(30 * time.Minute):
- isCheckSuccess = false
- break
- }
- }
- return isCheckSuccess
-}
-func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
- vl.removeFromWritable(vid)
- ch := make(chan bool, locationlist.Length())
- for index, dn := range locationlist.list {
- go func(index int, url string, vid storage.VolumeId) {
- fmt.Println(index, "Start vacuuming", vid, "on", dn.Url())
- if e := vacuumVolume_Compact(url, vid); e != nil {
- fmt.Println(index, "Error when vacuuming", vid, "on", url, e)
- ch <- false
- } else {
- fmt.Println(index, "Complete vacuuming", vid, "on", url)
- ch <- true
- }
- }(index, dn.Url(), vid)
- }
- isVacuumSuccess := true
- for _ = range locationlist.list {
- select {
- case _ = <-ch:
- case <-time.After(30 * time.Minute):
- isVacuumSuccess = false
- break
- }
- }
- return isVacuumSuccess
-}
-func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
- isCommitSuccess := true
- for _, dn := range locationlist.list {
- fmt.Println("Start Commiting vacuum", vid, "on", dn.Url())
- if e := vacuumVolume_Commit(dn.Url(), vid); e != nil {
- fmt.Println("Error when committing vacuum", vid, "on", dn.Url(), e)
- isCommitSuccess = false
- } else {
- fmt.Println("Complete Commiting vacuum", vid, "on", dn.Url())
- }
- }
- if isCommitSuccess {
- vl.setVolumeWritable(vid)
- }
- return isCommitSuccess
-}
-func (t *Topology) Vacuum(garbageThreshold string) int {
- for _, vl := range t.replicaType2VolumeLayout {
- if vl != nil {
- for vid, locationlist := range vl.vid2location {
- if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) {
- if batchVacuumVolumeCompact(vl, vid, locationlist) {
- batchVacuumVolumeCommit(vl, vid, locationlist)
- }
- }
- }
- }
- }
- return 0
-}
-
-type VacuumVolumeResult struct {
- Result bool
- Error string
-}
-
-func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) {
- values := make(url.Values)
- values.Add("volume", vid.String())
- values.Add("garbageThreshold", garbageThreshold)
- jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_check", values)
- if err != nil {
- fmt.Println("parameters:", values)
- return err, false
- }
- var ret VacuumVolumeResult
- if err := json.Unmarshal(jsonBlob, &ret); err != nil {
- return err, false
- }
- if ret.Error != "" {
- return errors.New(ret.Error), false
- }
- return nil, ret.Result
-}
-func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error {
- values := make(url.Values)
- values.Add("volume", vid.String())
- jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_compact", values)
- if err != nil {
- return err
- }
- var ret VacuumVolumeResult
- if err := json.Unmarshal(jsonBlob, &ret); err != nil {
- return err
- }
- if ret.Error != "" {
- return errors.New(ret.Error)
- }
- return nil
-}
-func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error {
- values := make(url.Values)
- values.Add("volume", vid.String())
- jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_commit", values)
- if err != nil {
- return err
- }
- var ret VacuumVolumeResult
- if err := json.Unmarshal(jsonBlob, &ret); err != nil {
- return err
- }
- if ret.Error != "" {
- return errors.New(ret.Error)
- }
- return nil
-}
diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go
deleted file mode 100644
index 084dd5b57..000000000
--- a/weed/topology/topology_event_handling.go
+++ /dev/null
@@ -1,67 +0,0 @@
-package topology
-
-import (
- "fmt"
- "math/rand"
- "code.google.com/p/weed-fs/weed/storage"
- "time"
-)
-
-func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
- go func() {
- for {
- freshThreshHold := time.Now().Unix() - 3*t.pulse //3 times of sleep interval
- t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit)
- time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond)
- }
- }()
- go func(garbageThreshold string) {
- c := time.Tick(15 * time.Minute)
- for _ = range c {
- t.Vacuum(garbageThreshold)
- }
- }(garbageThreshold)
- go func() {
- for {
- select {
- case v := <-t.chanFullVolumes:
- t.SetVolumeCapacityFull(v)
- case dn := <-t.chanRecoveredDataNodes:
- t.RegisterRecoveredDataNode(dn)
- fmt.Println("DataNode", dn, "is back alive!")
- case dn := <-t.chanDeadDataNodes:
- t.UnRegisterDataNode(dn)
- fmt.Println("DataNode", dn, "is dead!")
- }
- }
- }()
-}
-func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
- vl := t.GetVolumeLayout(volumeInfo.RepType)
- if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
- return false
- }
- for _, dn := range vl.vid2location[volumeInfo.Id].list {
- dn.UpAdjustActiveVolumeCountDelta(-1)
- }
- return true
-}
-func (t *Topology) UnRegisterDataNode(dn *DataNode) {
- for _, v := range dn.volumes {
- fmt.Println("Removing Volume", v.Id, "from the dead volume server", dn)
- vl := t.GetVolumeLayout(v.RepType)
- vl.SetVolumeUnavailable(dn, v.Id)
- }
- dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
- dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount())
- dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount())
- dn.Parent().UnlinkChildNode(dn.Id())
-}
-func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
- for _, v := range dn.volumes {
- vl := t.GetVolumeLayout(v.RepType)
- if vl.isWritable(&v) {
- vl.SetVolumeAvailable(dn, v.Id)
- }
- }
-}
diff --git a/weed/topology/topology_map.go b/weed/topology/topology_map.go
deleted file mode 100644
index b416ee943..000000000
--- a/weed/topology/topology_map.go
+++ /dev/null
@@ -1,50 +0,0 @@
-package topology
-
-import ()
-
-func (t *Topology) ToMap() interface{} {
- m := make(map[string]interface{})
- m["Max"] = t.GetMaxVolumeCount()
- m["Free"] = t.FreeSpace()
- var dcs []interface{}
- for _, c := range t.Children() {
- dc := c.(*DataCenter)
- dcs = append(dcs, dc.ToMap())
- }
- m["DataCenters"] = dcs
- var layouts []interface{}
- for _, layout := range t.replicaType2VolumeLayout {
- if layout != nil {
- layouts = append(layouts, layout.ToMap())
- }
- }
- m["layouts"] = layouts
- return m
-}
-
-func (t *Topology) ToVolumeMap() interface{} {
- m := make(map[string]interface{})
- m["Max"] = t.GetMaxVolumeCount()
- m["Free"] = t.FreeSpace()
- dcs := make(map[NodeId]interface{})
- for _, c := range t.Children() {
- dc := c.(*DataCenter)
- racks := make(map[NodeId]interface{})
- for _, r := range dc.Children() {
- rack := r.(*Rack)
- dataNodes := make(map[NodeId]interface{})
- for _, d := range rack.Children() {
- dn := d.(*DataNode)
- var volumes []interface{}
- for _, v := range dn.volumes {
- volumes = append(volumes, v)
- }
- dataNodes[d.Id()] = volumes
- }
- racks[r.Id()] = dataNodes
- }
- dcs[dc.Id()] = racks
- }
- m["DataCenters"] = dcs
- return m
-}
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
deleted file mode 100644
index 494630d9f..000000000
--- a/weed/topology/volume_layout.go
+++ /dev/null
@@ -1,116 +0,0 @@
-package topology
-
-import (
- "errors"
- "fmt"
- "math/rand"
- "code.google.com/p/weed-fs/weed/storage"
-)
-
-type VolumeLayout struct {
- repType storage.ReplicationType
- vid2location map[storage.VolumeId]*VolumeLocationList
- writables []storage.VolumeId // transient array of writable volume id
- pulse int64
- volumeSizeLimit uint64
-}
-
-func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pulse int64) *VolumeLayout {
- return &VolumeLayout{
- repType: repType,
- vid2location: make(map[storage.VolumeId]*VolumeLocationList),
- writables: *new([]storage.VolumeId),
- pulse: pulse,
- volumeSizeLimit: volumeSizeLimit,
- }
-}
-
-func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
- if _, ok := vl.vid2location[v.Id]; !ok {
- vl.vid2location[v.Id] = NewVolumeLocationList()
- }
- if vl.vid2location[v.Id].Add(dn) {
- if len(vl.vid2location[v.Id].list) == v.RepType.GetCopyCount() {
- if vl.isWritable(v) {
- vl.writables = append(vl.writables, v.Id)
- }
- }
- }
-}
-
-func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
- return uint64(v.Size) < vl.volumeSizeLimit && v.Version == storage.CurrentVersion
-}
-
-func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
- return vl.vid2location[vid].list
-}
-
-func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *VolumeLocationList, error) {
- len_writers := len(vl.writables)
- if len_writers <= 0 {
- fmt.Println("No more writable volumes!")
- return nil, 0, nil, errors.New("No more writable volumes!")
- }
- vid := vl.writables[rand.Intn(len_writers)]
- locationList := vl.vid2location[vid]
- if locationList != nil {
- return &vid, count, locationList, nil
- }
- return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!")
-}
-
-func (vl *VolumeLayout) GetActiveVolumeCount() int {
- return len(vl.writables)
-}
-
-func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool {
- for i, v := range vl.writables {
- if v == vid {
- fmt.Println("Volume", vid, "becomes unwritable")
- vl.writables = append(vl.writables[:i], vl.writables[i+1:]...)
- return true
- }
- }
- return false
-}
-func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
- for _, v := range vl.writables {
- if v == vid {
- return false
- }
- }
- fmt.Println("Volume", vid, "becomes writable")
- vl.writables = append(vl.writables, vid)
- return true
-}
-
-func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool {
- if vl.vid2location[vid].Remove(dn) {
- if vl.vid2location[vid].Length() < vl.repType.GetCopyCount() {
- fmt.Println("Volume", vid, "has", vl.vid2location[vid].Length(), "replica, less than required", vl.repType.GetCopyCount())
- return vl.removeFromWritable(vid)
- }
- }
- return false
-}
-func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool {
- if vl.vid2location[vid].Add(dn) {
- if vl.vid2location[vid].Length() >= vl.repType.GetCopyCount() {
- return vl.setVolumeWritable(vid)
- }
- }
- return false
-}
-
-func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool {
- return vl.removeFromWritable(vid)
-}
-
-func (vl *VolumeLayout) ToMap() interface{} {
- m := make(map[string]interface{})
- m["replication"] = vl.repType.String()
- m["writables"] = vl.writables
- //m["locations"] = vl.vid2location
- return m
-}
diff --git a/weed/topology/volume_location.go b/weed/topology/volume_location.go
deleted file mode 100644
index 507a240b5..000000000
--- a/weed/topology/volume_location.go
+++ /dev/null
@@ -1,58 +0,0 @@
-package topology
-
-import ()
-
-type VolumeLocationList struct {
- list []*DataNode
-}
-
-func NewVolumeLocationList() *VolumeLocationList {
- return &VolumeLocationList{}
-}
-
-func (dnll *VolumeLocationList) Head() *DataNode {
- return dnll.list[0]
-}
-
-func (dnll *VolumeLocationList) Length() int {
- return len(dnll.list)
-}
-
-func (dnll *VolumeLocationList) Add(loc *DataNode) bool {
- for _, dnl := range dnll.list {
- if loc.Ip == dnl.Ip && loc.Port == dnl.Port {
- return false
- }
- }
- dnll.list = append(dnll.list, loc)
- return true
-}
-
-func (dnll *VolumeLocationList) Remove(loc *DataNode) bool {
- for i, dnl := range dnll.list {
- if loc.Ip == dnl.Ip && loc.Port == dnl.Port {
- dnll.list = append(dnll.list[:i], dnll.list[i+1:]...)
- return true
- }
- }
- return false
-}
-
-func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) {
- var changed bool
- for _, dnl := range dnll.list {
- if dnl.LastSeen < freshThreshHold {
- changed = true
- break
- }
- }
- if changed {
- var l []*DataNode
- for _, dnl := range dnll.list {
- if dnl.LastSeen >= freshThreshHold {
- l = append(l, dnl)
- }
- }
- dnll.list = l
- }
-}