aboutsummaryrefslogtreecommitdiff
path: root/src/weed/topology
diff options
context:
space:
mode:
Diffstat (limited to 'src/weed/topology')
-rw-r--r--src/weed/topology/configuration.go56
-rw-r--r--src/weed/topology/configuration_test.go42
-rw-r--r--src/weed/topology/data_center.go41
-rw-r--r--src/weed/topology/data_node.go60
-rw-r--r--src/weed/topology/node.go200
-rw-r--r--src/weed/topology/node_list.go69
-rw-r--r--src/weed/topology/node_list_test.go39
-rw-r--r--src/weed/topology/rack.go64
-rw-r--r--src/weed/topology/topo_test.go127
-rw-r--r--src/weed/topology/topology.go148
-rw-r--r--src/weed/topology/topology_compact.go150
-rw-r--r--src/weed/topology/topology_event_handling.go67
-rw-r--r--src/weed/topology/topology_map.go50
-rw-r--r--src/weed/topology/volume_layout.go116
-rw-r--r--src/weed/topology/volume_location.go58
15 files changed, 1287 insertions, 0 deletions
diff --git a/src/weed/topology/configuration.go b/src/weed/topology/configuration.go
new file mode 100644
index 000000000..4c8424214
--- /dev/null
+++ b/src/weed/topology/configuration.go
@@ -0,0 +1,56 @@
+package topology
+
+import (
+ "encoding/xml"
+)
+
+type loc struct {
+ dcName string
+ rackName string
+}
+type rack struct {
+ Name string `xml:"name,attr"`
+ Ips []string `xml:"Ip"`
+}
+type dataCenter struct {
+ Name string `xml:"name,attr"`
+ Racks []rack `xml:"Rack"`
+}
+type topology struct {
+ DataCenters []dataCenter `xml:"DataCenter"`
+}
+type Configuration struct {
+ XMLName xml.Name `xml:"Configuration"`
+ Topo topology `xml:"Topology"`
+ ip2location map[string]loc
+}
+
+func NewConfiguration(b []byte) (*Configuration, error) {
+ c := &Configuration{}
+ err := xml.Unmarshal(b, c)
+ c.ip2location = make(map[string]loc)
+ for _, dc := range c.Topo.DataCenters {
+ for _, rack := range dc.Racks {
+ for _, ip := range rack.Ips {
+ c.ip2location[ip] = loc{dcName: dc.Name, rackName: rack.Name}
+ }
+ }
+ }
+ return c, err
+}
+
+func (c *Configuration) String() string {
+ if b, e := xml.MarshalIndent(c, " ", " "); e == nil {
+ return string(b)
+ }
+ return ""
+}
+
+func (c *Configuration) Locate(ip string) (dc string, rack string) {
+ if c != nil && c.ip2location != nil {
+ if loc, ok := c.ip2location[ip]; ok {
+ return loc.dcName, loc.rackName
+ }
+ }
+ return "DefaultDataCenter", "DefaultRack"
+}
diff --git a/src/weed/topology/configuration_test.go b/src/weed/topology/configuration_test.go
new file mode 100644
index 000000000..35d82c058
--- /dev/null
+++ b/src/weed/topology/configuration_test.go
@@ -0,0 +1,42 @@
+package topology
+
+import (
+ "fmt"
+ "testing"
+)
+
+func TestLoadConfiguration(t *testing.T) {
+
+ confContent := `
+
+<?xml version="1.0" encoding="UTF-8" ?>
+<Configuration>
+ <Topology>
+ <DataCenter name="dc1">
+ <Rack name="rack1">
+ <Ip>192.168.1.1</Ip>
+ </Rack>
+ </DataCenter>
+ <DataCenter name="dc2">
+ <Rack name="rack1">
+ <Ip>192.168.1.2</Ip>
+ </Rack>
+ <Rack name="rack2">
+ <Ip>192.168.1.3</Ip>
+ <Ip>192.168.1.4</Ip>
+ </Rack>
+ </DataCenter>
+ </Topology>
+</Configuration>
+`
+ c, err := NewConfiguration([]byte(confContent))
+
+ fmt.Printf("%s\n", c)
+ if err != nil {
+ t.Fatalf("unmarshal error:%s", err.Error())
+ }
+
+ if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" {
+ t.Fatalf("unmarshal error:%s", c)
+ }
+}
diff --git a/src/weed/topology/data_center.go b/src/weed/topology/data_center.go
new file mode 100644
index 000000000..a3b2b7d13
--- /dev/null
+++ b/src/weed/topology/data_center.go
@@ -0,0 +1,41 @@
+package topology
+
+import ()
+
+type DataCenter struct {
+ NodeImpl
+}
+
+func NewDataCenter(id string) *DataCenter {
+ dc := &DataCenter{}
+ dc.id = NodeId(id)
+ dc.nodeType = "DataCenter"
+ dc.children = make(map[NodeId]Node)
+ dc.NodeImpl.value = dc
+ return dc
+}
+
+func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack {
+ for _, c := range dc.Children() {
+ rack := c.(*Rack)
+ if string(rack.Id()) == rackName {
+ return rack
+ }
+ }
+ rack := NewRack(rackName)
+ dc.LinkChildNode(rack)
+ return rack
+}
+
+func (dc *DataCenter) ToMap() interface{} {
+ m := make(map[string]interface{})
+ m["Max"] = dc.GetMaxVolumeCount()
+ m["Free"] = dc.FreeSpace()
+ var racks []interface{}
+ for _, c := range dc.Children() {
+ rack := c.(*Rack)
+ racks = append(racks, rack.ToMap())
+ }
+ m["Racks"] = racks
+ return m
+}
diff --git a/src/weed/topology/data_node.go b/src/weed/topology/data_node.go
new file mode 100644
index 000000000..bea4729e2
--- /dev/null
+++ b/src/weed/topology/data_node.go
@@ -0,0 +1,60 @@
+package topology
+
+import (
+ _ "fmt"
+ "weed/storage"
+ "strconv"
+)
+
+type DataNode struct {
+ NodeImpl
+ volumes map[storage.VolumeId]storage.VolumeInfo
+ Ip string
+ Port int
+ PublicUrl string
+ LastSeen int64 // unix time in seconds
+ Dead bool
+}
+
+func NewDataNode(id string) *DataNode {
+ s := &DataNode{}
+ s.id = NodeId(id)
+ s.nodeType = "DataNode"
+ s.volumes = make(map[storage.VolumeId]storage.VolumeInfo)
+ s.NodeImpl.value = s
+ return s
+}
+func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
+ if _, ok := dn.volumes[v.Id]; !ok {
+ dn.volumes[v.Id] = v
+ dn.UpAdjustVolumeCountDelta(1)
+ dn.UpAdjustActiveVolumeCountDelta(1)
+ dn.UpAdjustMaxVolumeId(v.Id)
+ } else {
+ dn.volumes[v.Id] = v
+ }
+}
+func (dn *DataNode) GetTopology() *Topology {
+ p := dn.parent
+ for p.Parent() != nil {
+ p = p.Parent()
+ }
+ t := p.(*Topology)
+ return t
+}
+func (dn *DataNode) MatchLocation(ip string, port int) bool {
+ return dn.Ip == ip && dn.Port == port
+}
+func (dn *DataNode) Url() string {
+ return dn.Ip + ":" + strconv.Itoa(dn.Port)
+}
+
+func (dn *DataNode) ToMap() interface{} {
+ ret := make(map[string]interface{})
+ ret["Url"] = dn.Url()
+ ret["Volumes"] = dn.GetVolumeCount()
+ ret["Max"] = dn.GetMaxVolumeCount()
+ ret["Free"] = dn.FreeSpace()
+ ret["PublicUrl"] = dn.PublicUrl
+ return ret
+}
diff --git a/src/weed/topology/node.go b/src/weed/topology/node.go
new file mode 100644
index 000000000..0bc85011c
--- /dev/null
+++ b/src/weed/topology/node.go
@@ -0,0 +1,200 @@
+package topology
+
+import (
+ "fmt"
+ "weed/storage"
+)
+
+type NodeId string
+type Node interface {
+ Id() NodeId
+ String() string
+ FreeSpace() int
+ ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode)
+ UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int)
+ UpAdjustVolumeCountDelta(volumeCountDelta int)
+ UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int)
+ UpAdjustMaxVolumeId(vid storage.VolumeId)
+
+ GetVolumeCount() int
+ GetActiveVolumeCount() int
+ GetMaxVolumeCount() int
+ GetMaxVolumeId() storage.VolumeId
+ SetParent(Node)
+ LinkChildNode(node Node)
+ UnlinkChildNode(nodeId NodeId)
+ CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64)
+
+ IsDataNode() bool
+ Children() map[NodeId]Node
+ Parent() Node
+
+ GetValue() interface{} //get reference to the topology,dc,rack,datanode
+}
+type NodeImpl struct {
+ id NodeId
+ volumeCount int
+ activeVolumeCount int
+ maxVolumeCount int
+ parent Node
+ children map[NodeId]Node
+ maxVolumeId storage.VolumeId
+
+ //for rack, data center, topology
+ nodeType string
+ value interface{}
+}
+
+func (n *NodeImpl) IsDataNode() bool {
+ return n.nodeType == "DataNode"
+}
+func (n *NodeImpl) IsRack() bool {
+ return n.nodeType == "Rack"
+}
+func (n *NodeImpl) IsDataCenter() bool {
+ return n.nodeType == "DataCenter"
+}
+func (n *NodeImpl) String() string {
+ if n.parent != nil {
+ return n.parent.String() + ":" + string(n.id)
+ }
+ return string(n.id)
+}
+func (n *NodeImpl) Id() NodeId {
+ return n.id
+}
+func (n *NodeImpl) FreeSpace() int {
+ return n.maxVolumeCount - n.volumeCount
+}
+func (n *NodeImpl) SetParent(node Node) {
+ n.parent = node
+}
+func (n *NodeImpl) Children() map[NodeId]Node {
+ return n.children
+}
+func (n *NodeImpl) Parent() Node {
+ return n.parent
+}
+func (n *NodeImpl) GetValue() interface{} {
+ return n.value
+}
+func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) {
+ ret := false
+ var assignedNode *DataNode
+ for _, node := range n.children {
+ freeSpace := node.FreeSpace()
+ //fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
+ if freeSpace <= 0 {
+ continue
+ }
+ if r >= freeSpace {
+ r -= freeSpace
+ } else {
+ if node.IsDataNode() && node.FreeSpace() > 0 {
+ //fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
+ return true, node.(*DataNode)
+ }
+ ret, assignedNode = node.ReserveOneVolume(r, vid)
+ if ret {
+ break
+ }
+ }
+ }
+ return ret, assignedNode
+}
+
+func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative
+ n.maxVolumeCount += maxVolumeCountDelta
+ if n.parent != nil {
+ n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta)
+ }
+}
+func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative
+ n.volumeCount += volumeCountDelta
+ if n.parent != nil {
+ n.parent.UpAdjustVolumeCountDelta(volumeCountDelta)
+ }
+}
+func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative
+ n.activeVolumeCount += activeVolumeCountDelta
+ if n.parent != nil {
+ n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta)
+ }
+}
+func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative
+ if n.maxVolumeId < vid {
+ n.maxVolumeId = vid
+ if n.parent != nil {
+ n.parent.UpAdjustMaxVolumeId(vid)
+ }
+ }
+}
+func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId {
+ return n.maxVolumeId
+}
+func (n *NodeImpl) GetVolumeCount() int {
+ return n.volumeCount
+}
+func (n *NodeImpl) GetActiveVolumeCount() int {
+ return n.activeVolumeCount
+}
+func (n *NodeImpl) GetMaxVolumeCount() int {
+ return n.maxVolumeCount
+}
+
+func (n *NodeImpl) LinkChildNode(node Node) {
+ if n.children[node.Id()] == nil {
+ n.children[node.Id()] = node
+ n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
+ n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
+ n.UpAdjustVolumeCountDelta(node.GetVolumeCount())
+ n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount())
+ node.SetParent(n)
+ fmt.Println(n, "adds child", node.Id())
+ }
+}
+
+func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
+ node := n.children[nodeId]
+ node.SetParent(nil)
+ if node != nil {
+ delete(n.children, node.Id())
+ n.UpAdjustVolumeCountDelta(-node.GetVolumeCount())
+ n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
+ n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount())
+ fmt.Println(n, "removes", node, "volumeCount =", n.activeVolumeCount)
+ }
+}
+
+func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) {
+ if n.IsRack() {
+ for _, c := range n.Children() {
+ dn := c.(*DataNode) //can not cast n to DataNode
+ if dn.LastSeen < freshThreshHold {
+ if !dn.Dead {
+ dn.Dead = true
+ n.GetTopology().chanDeadDataNodes <- dn
+ }
+ }
+ for _, v := range dn.volumes {
+ if uint64(v.Size) >= volumeSizeLimit {
+ //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
+ n.GetTopology().chanFullVolumes <- v
+ }
+ }
+ }
+ } else {
+ for _, c := range n.Children() {
+ c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit)
+ }
+ }
+}
+
+func (n *NodeImpl) GetTopology() *Topology {
+ var p Node
+ p = n
+ for p.Parent() != nil {
+ p = p.Parent()
+ }
+ return p.GetValue().(*Topology)
+}
diff --git a/src/weed/topology/node_list.go b/src/weed/topology/node_list.go
new file mode 100644
index 000000000..17ab1e0dc
--- /dev/null
+++ b/src/weed/topology/node_list.go
@@ -0,0 +1,69 @@
+package topology
+
+import (
+ "fmt"
+ "math/rand"
+ "weed/storage"
+)
+
+type NodeList struct {
+ nodes map[NodeId]Node
+ except map[string]Node
+}
+
+func NewNodeList(nodes map[NodeId]Node, except map[string]Node) *NodeList {
+ m := make(map[NodeId]Node, len(nodes)-len(except))
+ for _, n := range nodes {
+ if except[n.String()] == nil {
+ m[n.Id()] = n
+ }
+ }
+ nl := &NodeList{nodes: m}
+ return nl
+}
+
+func (nl *NodeList) FreeSpace() int {
+ freeSpace := 0
+ for _, n := range nl.nodes {
+ freeSpace += n.FreeSpace()
+ }
+ return freeSpace
+}
+
+func (nl *NodeList) RandomlyPickN(n int, min int) ([]Node, bool) {
+ var list []Node
+ for _, n := range nl.nodes {
+ if n.FreeSpace() >= min {
+ list = append(list, n)
+ }
+ }
+ if n > len(list) {
+ return nil, false
+ }
+ for i := n; i > 0; i-- {
+ r := rand.Intn(i)
+ t := list[r]
+ list[r] = list[i-1]
+ list[i-1] = t
+ }
+ return list[len(list)-n:], true
+}
+
+func (nl *NodeList) ReserveOneVolume(randomVolumeIndex int, vid storage.VolumeId) (bool, *DataNode) {
+ for _, node := range nl.nodes {
+ freeSpace := node.FreeSpace()
+ if randomVolumeIndex >= freeSpace {
+ randomVolumeIndex -= freeSpace
+ } else {
+ if node.IsDataNode() && node.FreeSpace() > 0 {
+ fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
+ return true, node.(*DataNode)
+ }
+ children := node.Children()
+ newNodeList := NewNodeList(children, nl.except)
+ return newNodeList.ReserveOneVolume(randomVolumeIndex, vid)
+ }
+ }
+ return false, nil
+
+}
diff --git a/src/weed/topology/node_list_test.go b/src/weed/topology/node_list_test.go
new file mode 100644
index 000000000..2fb4fa970
--- /dev/null
+++ b/src/weed/topology/node_list_test.go
@@ -0,0 +1,39 @@
+package topology
+
+import (
+ _ "fmt"
+ "strconv"
+ "testing"
+)
+
+func TestXYZ(t *testing.T) {
+ topo := NewTopology("topo", "/etc/weed.conf", "/tmp", "test", 234, 5)
+ for i := 0; i < 5; i++ {
+ dc := NewDataCenter("dc" + strconv.Itoa(i))
+ dc.activeVolumeCount = i
+ dc.maxVolumeCount = 5
+ topo.LinkChildNode(dc)
+ }
+ nl := NewNodeList(topo.Children(), nil)
+
+ picked, ret := nl.RandomlyPickN(1)
+ if !ret || len(picked) != 1 {
+ t.Errorf("need to randomly pick 1 node")
+ }
+
+ picked, ret = nl.RandomlyPickN(4)
+ if !ret || len(picked) != 4 {
+ t.Errorf("need to randomly pick 4 nodes")
+ }
+
+ picked, ret = nl.RandomlyPickN(5)
+ if !ret || len(picked) != 5 {
+ t.Errorf("need to randomly pick 5 nodes")
+ }
+
+ picked, ret = nl.RandomlyPickN(6)
+ if ret || len(picked) != 0 {
+ t.Errorf("can not randomly pick 6 nodes:", ret, picked)
+ }
+
+}
diff --git a/src/weed/topology/rack.go b/src/weed/topology/rack.go
new file mode 100644
index 000000000..acc34417a
--- /dev/null
+++ b/src/weed/topology/rack.go
@@ -0,0 +1,64 @@
+package topology
+
+import (
+ "strconv"
+ "time"
+)
+
+type Rack struct {
+ NodeImpl
+}
+
+func NewRack(id string) *Rack {
+ r := &Rack{}
+ r.id = NodeId(id)
+ r.nodeType = "Rack"
+ r.children = make(map[NodeId]Node)
+ r.NodeImpl.value = r
+ return r
+}
+
+func (r *Rack) FindDataNode(ip string, port int) *DataNode {
+ for _, c := range r.Children() {
+ dn := c.(*DataNode)
+ if dn.MatchLocation(ip, port) {
+ return dn
+ }
+ }
+ return nil
+}
+func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode {
+ for _, c := range r.Children() {
+ dn := c.(*DataNode)
+ if dn.MatchLocation(ip, port) {
+ dn.LastSeen = time.Now().Unix()
+ if dn.Dead {
+ dn.Dead = false
+ r.GetTopology().chanRecoveredDataNodes <- dn
+ dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount)
+ }
+ return dn
+ }
+ }
+ dn := NewDataNode(ip + ":" + strconv.Itoa(port))
+ dn.Ip = ip
+ dn.Port = port
+ dn.PublicUrl = publicUrl
+ dn.maxVolumeCount = maxVolumeCount
+ dn.LastSeen = time.Now().Unix()
+ r.LinkChildNode(dn)
+ return dn
+}
+
+func (rack *Rack) ToMap() interface{} {
+ m := make(map[string]interface{})
+ m["Max"] = rack.GetMaxVolumeCount()
+ m["Free"] = rack.FreeSpace()
+ var dns []interface{}
+ for _, c := range rack.Children() {
+ dn := c.(*DataNode)
+ dns = append(dns, dn.ToMap())
+ }
+ m["DataNodes"] = dns
+ return m
+}
diff --git a/src/weed/topology/topo_test.go b/src/weed/topology/topo_test.go
new file mode 100644
index 000000000..b77fb8ad8
--- /dev/null
+++ b/src/weed/topology/topo_test.go
@@ -0,0 +1,127 @@
+package topology
+
+import (
+ "encoding/json"
+ "fmt"
+ "math/rand"
+ "weed/storage"
+ "testing"
+ "time"
+)
+
+var topologyLayout = `
+{
+ "dc1":{
+ "rack1":{
+ "server1":{
+ "volumes":[
+ {"id":1, "size":12312},
+ {"id":2, "size":12312},
+ {"id":3, "size":12312}
+ ],
+ "limit":3
+ },
+ "server2":{
+ "volumes":[
+ {"id":4, "size":12312},
+ {"id":5, "size":12312},
+ {"id":6, "size":12312}
+ ],
+ "limit":10
+ }
+ },
+ "rack2":{
+ "server1":{
+ "volumes":[
+ {"id":4, "size":12312},
+ {"id":5, "size":12312},
+ {"id":6, "size":12312}
+ ],
+ "limit":4
+ },
+ "server2":{
+ "volumes":[],
+ "limit":4
+ },
+ "server3":{
+ "volumes":[
+ {"id":2, "size":12312},
+ {"id":3, "size":12312},
+ {"id":4, "size":12312}
+ ],
+ "limit":2
+ }
+ }
+ },
+ "dc2":{
+ },
+ "dc3":{
+ "rack2":{
+ "server1":{
+ "volumes":[
+ {"id":1, "size":12312},
+ {"id":3, "size":12312},
+ {"id":5, "size":12312}
+ ],
+ "limit":4
+ }
+ }
+ }
+}
+`
+
+func setup(topologyLayout string) *Topology {
+ var data interface{}
+ err := json.Unmarshal([]byte(topologyLayout), &data)
+ if err != nil {
+ fmt.Println("error:", err)
+ }
+
+ //need to connect all nodes first before server adding volumes
+ topo := NewTopology("mynetwork", "/etc/weed.conf", "/tmp", "test", 234, 5)
+ mTopology := data.(map[string]interface{})
+ for dcKey, dcValue := range mTopology {
+ dc := NewDataCenter(dcKey)
+ dcMap := dcValue.(map[string]interface{})
+ topo.LinkChildNode(dc)
+ for rackKey, rackValue := range dcMap {
+ rack := NewRack(rackKey)
+ rackMap := rackValue.(map[string]interface{})
+ dc.LinkChildNode(rack)
+ for serverKey, serverValue := range rackMap {
+ server := NewDataNode(serverKey)
+ serverMap := serverValue.(map[string]interface{})
+ rack.LinkChildNode(server)
+ for _, v := range serverMap["volumes"].([]interface{}) {
+ m := v.(map[string]interface{})
+ vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion}
+ server.AddOrUpdateVolume(vi)
+ }
+ server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
+ }
+ }
+ }
+
+ return topo
+}
+
+func TestRemoveDataCenter(t *testing.T) {
+ topo := setup(topologyLayout)
+ topo.UnlinkChildNode(NodeId("dc2"))
+ if topo.GetActiveVolumeCount() != 15 {
+ t.Fail()
+ }
+ topo.UnlinkChildNode(NodeId("dc3"))
+ if topo.GetActiveVolumeCount() != 12 {
+ t.Fail()
+ }
+}
+
+func TestReserveOneVolume(t *testing.T) {
+ topo := setup(topologyLayout)
+ rand.Seed(time.Now().UnixNano())
+ rand.Seed(1)
+ ret, node, vid := topo.RandomlyReserveOneVolume()
+ fmt.Println("assigned :", ret, ", node :", node, ", volume id:", vid)
+
+}
diff --git a/src/weed/topology/topology.go b/src/weed/topology/topology.go
new file mode 100644
index 000000000..d0b12def4
--- /dev/null
+++ b/src/weed/topology/topology.go
@@ -0,0 +1,148 @@
+package topology
+
+import (
+ "errors"
+ "io/ioutil"
+ "math/rand"
+ "weed/directory"
+ "weed/sequence"
+ "weed/storage"
+)
+
+type Topology struct {
+ NodeImpl
+
+ //transient vid~servers mapping for each replication type
+ replicaType2VolumeLayout []*VolumeLayout
+
+ pulse int64
+
+ volumeSizeLimit uint64
+
+ sequence sequence.Sequencer
+
+ chanDeadDataNodes chan *DataNode
+ chanRecoveredDataNodes chan *DataNode
+ chanFullVolumes chan storage.VolumeInfo
+
+ configuration *Configuration
+}
+
+func NewTopology(id string, confFile string, dirname string, sequenceFilename string, volumeSizeLimit uint64, pulse int) *Topology {
+ t := &Topology{}
+ t.id = NodeId(id)
+ t.nodeType = "Topology"
+ t.NodeImpl.value = t
+ t.children = make(map[NodeId]Node)
+ t.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType)
+ t.pulse = int64(pulse)
+ t.volumeSizeLimit = volumeSizeLimit
+
+ t.sequence = sequence.NewSequencer(dirname, sequenceFilename)
+
+ t.chanDeadDataNodes = make(chan *DataNode)
+ t.chanRecoveredDataNodes = make(chan *DataNode)
+ t.chanFullVolumes = make(chan storage.VolumeInfo)
+
+ t.loadConfiguration(confFile)
+
+ return t
+}
+
+func (t *Topology) loadConfiguration(configurationFile string) error {
+ b, e := ioutil.ReadFile(configurationFile)
+ if e == nil {
+ t.configuration, e = NewConfiguration(b)
+ }
+ return e
+}
+
+func (t *Topology) Lookup(vid storage.VolumeId) []*DataNode {
+ for _, vl := range t.replicaType2VolumeLayout {
+ if vl != nil {
+ if list := vl.Lookup(vid); list != nil {
+ return list
+ }
+ }
+ }
+ return nil
+}
+
+func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, *storage.VolumeId) {
+ if t.FreeSpace() <= 0 {
+ return false, nil, nil
+ }
+ vid := t.NextVolumeId()
+ ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid)
+ return ret, node, &vid
+}
+
+func (t *Topology) RandomlyReserveOneVolumeExcept(except []Node) (bool, *DataNode, *storage.VolumeId) {
+ freeSpace := t.FreeSpace()
+ for _, node := range except {
+ freeSpace -= node.FreeSpace()
+ }
+ if freeSpace <= 0 {
+ return false, nil, nil
+ }
+ vid := t.NextVolumeId()
+ ret, node := t.ReserveOneVolume(rand.Intn(freeSpace), vid)
+ return ret, node, &vid
+}
+
+func (t *Topology) NextVolumeId() storage.VolumeId {
+ vid := t.GetMaxVolumeId()
+ return vid.Next()
+}
+
+func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (string, int, *DataNode, error) {
+ replicationTypeIndex := repType.GetReplicationLevelIndex()
+ if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
+ t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
+ }
+ vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count)
+ if err != nil {
+ return "", 0, nil, errors.New("No writable volumes avalable!")
+ }
+ fileId, count := t.sequence.NextFileId(count)
+ return directory.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
+}
+
+func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout {
+ replicationTypeIndex := repType.GetReplicationLevelIndex()
+ if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
+ t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
+ }
+ return t.replicaType2VolumeLayout[replicationTypeIndex]
+}
+
+func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
+ t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn)
+}
+
+func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int) {
+ dcName, rackName := t.configuration.Locate(ip)
+ dc := t.GetOrCreateDataCenter(dcName)
+ rack := dc.GetOrCreateRack(rackName)
+ dn := rack.FindDataNode(ip, port)
+ if init && dn != nil {
+ t.UnRegisterDataNode(dn)
+ }
+ dn = rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount)
+ for _, v := range volumeInfos {
+ dn.AddOrUpdateVolume(v)
+ t.RegisterVolumeLayout(&v, dn)
+ }
+}
+
+func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
+ for _, c := range t.Children() {
+ dc := c.(*DataCenter)
+ if string(dc.Id()) == dcName {
+ return dc
+ }
+ }
+ dc := NewDataCenter(dcName)
+ t.LinkChildNode(dc)
+ return dc
+}
diff --git a/src/weed/topology/topology_compact.go b/src/weed/topology/topology_compact.go
new file mode 100644
index 000000000..c2b85fe63
--- /dev/null
+++ b/src/weed/topology/topology_compact.go
@@ -0,0 +1,150 @@
+package topology
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "net/url"
+ "weed/storage"
+ "weed/util"
+ "time"
+)
+
+func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool {
+ ch := make(chan bool, locationlist.Length())
+ for index, dn := range locationlist.list {
+ go func(index int, url string, vid storage.VolumeId) {
+ //fmt.Println(index, "Check vacuuming", vid, "on", dn.Url())
+ if e, ret := vacuumVolume_Check(url, vid, garbageThreshold); e != nil {
+ //fmt.Println(index, "Error when checking vacuuming", vid, "on", url, e)
+ ch <- false
+ } else {
+ //fmt.Println(index, "Checked vacuuming", vid, "on", url, "needVacuum", ret)
+ ch <- ret
+ }
+ }(index, dn.Url(), vid)
+ }
+ isCheckSuccess := true
+ for _ = range locationlist.list {
+ select {
+ case canVacuum := <-ch:
+ isCheckSuccess = isCheckSuccess && canVacuum
+ case <-time.After(30 * time.Minute):
+ isCheckSuccess = false
+ break
+ }
+ }
+ return isCheckSuccess
+}
+func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
+ vl.removeFromWritable(vid)
+ ch := make(chan bool, locationlist.Length())
+ for index, dn := range locationlist.list {
+ go func(index int, url string, vid storage.VolumeId) {
+ fmt.Println(index, "Start vacuuming", vid, "on", dn.Url())
+ if e := vacuumVolume_Compact(url, vid); e != nil {
+ fmt.Println(index, "Error when vacuuming", vid, "on", url, e)
+ ch <- false
+ } else {
+ fmt.Println(index, "Complete vacuuming", vid, "on", url)
+ ch <- true
+ }
+ }(index, dn.Url(), vid)
+ }
+ isVacuumSuccess := true
+ for _ = range locationlist.list {
+ select {
+ case _ = <-ch:
+ case <-time.After(30 * time.Minute):
+ isVacuumSuccess = false
+ break
+ }
+ }
+ return isVacuumSuccess
+}
+func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
+ isCommitSuccess := true
+ for _, dn := range locationlist.list {
+ fmt.Println("Start Commiting vacuum", vid, "on", dn.Url())
+ if e := vacuumVolume_Commit(dn.Url(), vid); e != nil {
+ fmt.Println("Error when committing vacuum", vid, "on", dn.Url(), e)
+ isCommitSuccess = false
+ } else {
+ fmt.Println("Complete Commiting vacuum", vid, "on", dn.Url())
+ }
+ }
+ if isCommitSuccess {
+ vl.setVolumeWritable(vid)
+ }
+ return isCommitSuccess
+}
+func (t *Topology) Vacuum(garbageThreshold string) int {
+ for _, vl := range t.replicaType2VolumeLayout {
+ if vl != nil {
+ for vid, locationlist := range vl.vid2location {
+ if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) {
+ if batchVacuumVolumeCompact(vl, vid, locationlist) {
+ batchVacuumVolumeCommit(vl, vid, locationlist)
+ }
+ }
+ }
+ }
+ }
+ return 0
+}
+
+type VacuumVolumeResult struct {
+ Result bool
+ Error string
+}
+
+func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) {
+ values := make(url.Values)
+ values.Add("volume", vid.String())
+ values.Add("garbageThreshold", garbageThreshold)
+ jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_check", values)
+ if err != nil {
+ fmt.Println("parameters:", values)
+ return err, false
+ }
+ var ret VacuumVolumeResult
+ if err := json.Unmarshal(jsonBlob, &ret); err != nil {
+ return err, false
+ }
+ if ret.Error != "" {
+ return errors.New(ret.Error), false
+ }
+ return nil, ret.Result
+}
+func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error {
+ values := make(url.Values)
+ values.Add("volume", vid.String())
+ jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_compact", values)
+ if err != nil {
+ return err
+ }
+ var ret VacuumVolumeResult
+ if err := json.Unmarshal(jsonBlob, &ret); err != nil {
+ return err
+ }
+ if ret.Error != "" {
+ return errors.New(ret.Error)
+ }
+ return nil
+}
+func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error {
+ values := make(url.Values)
+ values.Add("volume", vid.String())
+ jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_commit", values)
+ if err != nil {
+ return err
+ }
+ var ret VacuumVolumeResult
+ if err := json.Unmarshal(jsonBlob, &ret); err != nil {
+ return err
+ }
+ if ret.Error != "" {
+ return errors.New(ret.Error)
+ }
+ return nil
+}
diff --git a/src/weed/topology/topology_event_handling.go b/src/weed/topology/topology_event_handling.go
new file mode 100644
index 000000000..6afd82dde
--- /dev/null
+++ b/src/weed/topology/topology_event_handling.go
@@ -0,0 +1,67 @@
+package topology
+
+import (
+ "fmt"
+ "math/rand"
+ "weed/storage"
+ "time"
+)
+
+func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
+ go func() {
+ for {
+ freshThreshHold := time.Now().Unix() - 3*t.pulse //3 times of sleep interval
+ t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit)
+ time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond)
+ }
+ }()
+ go func(garbageThreshold string) {
+ c := time.Tick(15 * time.Minute)
+ for _ = range c {
+ t.Vacuum(garbageThreshold)
+ }
+ }(garbageThreshold)
+ go func() {
+ for {
+ select {
+ case v := <-t.chanFullVolumes:
+ t.SetVolumeCapacityFull(v)
+ case dn := <-t.chanRecoveredDataNodes:
+ t.RegisterRecoveredDataNode(dn)
+ fmt.Println("DataNode", dn, "is back alive!")
+ case dn := <-t.chanDeadDataNodes:
+ t.UnRegisterDataNode(dn)
+ fmt.Println("DataNode", dn, "is dead!")
+ }
+ }
+ }()
+}
+func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
+ vl := t.GetVolumeLayout(volumeInfo.RepType)
+ if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
+ return false
+ }
+ for _, dn := range vl.vid2location[volumeInfo.Id].list {
+ dn.UpAdjustActiveVolumeCountDelta(-1)
+ }
+ return true
+}
+func (t *Topology) UnRegisterDataNode(dn *DataNode) {
+ for _, v := range dn.volumes {
+ fmt.Println("Removing Volume", v.Id, "from the dead volume server", dn)
+ vl := t.GetVolumeLayout(v.RepType)
+ vl.SetVolumeUnavailable(dn, v.Id)
+ }
+ dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
+ dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount())
+ dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount())
+ dn.Parent().UnlinkChildNode(dn.Id())
+}
+func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
+ for _, v := range dn.volumes {
+ vl := t.GetVolumeLayout(v.RepType)
+ if vl.isWritable(&v) {
+ vl.SetVolumeAvailable(dn, v.Id)
+ }
+ }
+}
diff --git a/src/weed/topology/topology_map.go b/src/weed/topology/topology_map.go
new file mode 100644
index 000000000..b416ee943
--- /dev/null
+++ b/src/weed/topology/topology_map.go
@@ -0,0 +1,50 @@
+package topology
+
+import ()
+
+func (t *Topology) ToMap() interface{} {
+ m := make(map[string]interface{})
+ m["Max"] = t.GetMaxVolumeCount()
+ m["Free"] = t.FreeSpace()
+ var dcs []interface{}
+ for _, c := range t.Children() {
+ dc := c.(*DataCenter)
+ dcs = append(dcs, dc.ToMap())
+ }
+ m["DataCenters"] = dcs
+ var layouts []interface{}
+ for _, layout := range t.replicaType2VolumeLayout {
+ if layout != nil {
+ layouts = append(layouts, layout.ToMap())
+ }
+ }
+ m["layouts"] = layouts
+ return m
+}
+
+func (t *Topology) ToVolumeMap() interface{} {
+ m := make(map[string]interface{})
+ m["Max"] = t.GetMaxVolumeCount()
+ m["Free"] = t.FreeSpace()
+ dcs := make(map[NodeId]interface{})
+ for _, c := range t.Children() {
+ dc := c.(*DataCenter)
+ racks := make(map[NodeId]interface{})
+ for _, r := range dc.Children() {
+ rack := r.(*Rack)
+ dataNodes := make(map[NodeId]interface{})
+ for _, d := range rack.Children() {
+ dn := d.(*DataNode)
+ var volumes []interface{}
+ for _, v := range dn.volumes {
+ volumes = append(volumes, v)
+ }
+ dataNodes[d.Id()] = volumes
+ }
+ racks[r.Id()] = dataNodes
+ }
+ dcs[dc.Id()] = racks
+ }
+ m["DataCenters"] = dcs
+ return m
+}
diff --git a/src/weed/topology/volume_layout.go b/src/weed/topology/volume_layout.go
new file mode 100644
index 000000000..c144c1861
--- /dev/null
+++ b/src/weed/topology/volume_layout.go
@@ -0,0 +1,116 @@
+package topology
+
+import (
+ "errors"
+ "fmt"
+ "math/rand"
+ "weed/storage"
+)
+
+type VolumeLayout struct {
+ repType storage.ReplicationType
+ vid2location map[storage.VolumeId]*VolumeLocationList
+ writables []storage.VolumeId // transient array of writable volume id
+ pulse int64
+ volumeSizeLimit uint64
+}
+
+func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pulse int64) *VolumeLayout {
+ return &VolumeLayout{
+ repType: repType,
+ vid2location: make(map[storage.VolumeId]*VolumeLocationList),
+ writables: *new([]storage.VolumeId),
+ pulse: pulse,
+ volumeSizeLimit: volumeSizeLimit,
+ }
+}
+
+func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
+ if _, ok := vl.vid2location[v.Id]; !ok {
+ vl.vid2location[v.Id] = NewVolumeLocationList()
+ }
+ if vl.vid2location[v.Id].Add(dn) {
+ if len(vl.vid2location[v.Id].list) == v.RepType.GetCopyCount() {
+ if vl.isWritable(v) {
+ vl.writables = append(vl.writables, v.Id)
+ }
+ }
+ }
+}
+
+func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
+ return uint64(v.Size) < vl.volumeSizeLimit && v.Version == storage.CurrentVersion
+}
+
+func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
+ return vl.vid2location[vid].list
+}
+
+func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *VolumeLocationList, error) {
+ len_writers := len(vl.writables)
+ if len_writers <= 0 {
+ fmt.Println("No more writable volumes!")
+ return nil, 0, nil, errors.New("No more writable volumes!")
+ }
+ vid := vl.writables[rand.Intn(len_writers)]
+ locationList := vl.vid2location[vid]
+ if locationList != nil {
+ return &vid, count, locationList, nil
+ }
+ return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!")
+}
+
+func (vl *VolumeLayout) GetActiveVolumeCount() int {
+ return len(vl.writables)
+}
+
+func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool {
+ for i, v := range vl.writables {
+ if v == vid {
+ fmt.Println("Volume", vid, "becomes unwritable")
+ vl.writables = append(vl.writables[:i], vl.writables[i+1:]...)
+ return true
+ }
+ }
+ return false
+}
+func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
+ for _, v := range vl.writables {
+ if v == vid {
+ return false
+ }
+ }
+ fmt.Println("Volume", vid, "becomes writable")
+ vl.writables = append(vl.writables, vid)
+ return true
+}
+
+func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool {
+ if vl.vid2location[vid].Remove(dn) {
+ if vl.vid2location[vid].Length() < vl.repType.GetCopyCount() {
+ fmt.Println("Volume", vid, "has", vl.vid2location[vid].Length(), "replica, less than required", vl.repType.GetCopyCount())
+ return vl.removeFromWritable(vid)
+ }
+ }
+ return false
+}
+func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool {
+ if vl.vid2location[vid].Add(dn) {
+ if vl.vid2location[vid].Length() >= vl.repType.GetCopyCount() {
+ return vl.setVolumeWritable(vid)
+ }
+ }
+ return false
+}
+
+func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool {
+ return vl.removeFromWritable(vid)
+}
+
+func (vl *VolumeLayout) ToMap() interface{} {
+ m := make(map[string]interface{})
+ m["replication"] = vl.repType.String()
+ m["writables"] = vl.writables
+ //m["locations"] = vl.vid2location
+ return m
+}
diff --git a/src/weed/topology/volume_location.go b/src/weed/topology/volume_location.go
new file mode 100644
index 000000000..507a240b5
--- /dev/null
+++ b/src/weed/topology/volume_location.go
@@ -0,0 +1,58 @@
+package topology
+
+import ()
+
+type VolumeLocationList struct {
+ list []*DataNode
+}
+
+func NewVolumeLocationList() *VolumeLocationList {
+ return &VolumeLocationList{}
+}
+
+func (dnll *VolumeLocationList) Head() *DataNode {
+ return dnll.list[0]
+}
+
+func (dnll *VolumeLocationList) Length() int {
+ return len(dnll.list)
+}
+
+func (dnll *VolumeLocationList) Add(loc *DataNode) bool {
+ for _, dnl := range dnll.list {
+ if loc.Ip == dnl.Ip && loc.Port == dnl.Port {
+ return false
+ }
+ }
+ dnll.list = append(dnll.list, loc)
+ return true
+}
+
+func (dnll *VolumeLocationList) Remove(loc *DataNode) bool {
+ for i, dnl := range dnll.list {
+ if loc.Ip == dnl.Ip && loc.Port == dnl.Port {
+ dnll.list = append(dnll.list[:i], dnll.list[i+1:]...)
+ return true
+ }
+ }
+ return false
+}
+
+func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) {
+ var changed bool
+ for _, dnl := range dnll.list {
+ if dnl.LastSeen < freshThreshHold {
+ changed = true
+ break
+ }
+ }
+ if changed {
+ var l []*DataNode
+ for _, dnl := range dnll.list {
+ if dnl.LastSeen >= freshThreshHold {
+ l = append(l, dnl)
+ }
+ }
+ dnll.list = l
+ }
+}