aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/node.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2013-02-10 03:25:35 -0800
committerChris Lu <chris.lu@gmail.com>2013-02-10 03:25:35 -0800
commitab6fb13ad795763ba7fc7c91696d2890be6da543 (patch)
tree3d956452bdcef7ea4b50457faff07df06f8d4346 /weed/topology/node.go
parent1b6f53cdac9a44370f67276ede138f5cde8806c2 (diff)
downloadseaweedfs-ab6fb13ad795763ba7fc7c91696d2890be6da543.tar.xz
seaweedfs-ab6fb13ad795763ba7fc7c91696d2890be6da543.zip
avoid the "src" folder
Diffstat (limited to 'weed/topology/node.go')
-rw-r--r--weed/topology/node.go200
1 files changed, 200 insertions, 0 deletions
diff --git a/weed/topology/node.go b/weed/topology/node.go
new file mode 100644
index 000000000..0bc85011c
--- /dev/null
+++ b/weed/topology/node.go
@@ -0,0 +1,200 @@
+package topology
+
+import (
+ "fmt"
+ "weed/storage"
+)
+
+type NodeId string
+type Node interface {
+ Id() NodeId
+ String() string
+ FreeSpace() int
+ ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode)
+ UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int)
+ UpAdjustVolumeCountDelta(volumeCountDelta int)
+ UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int)
+ UpAdjustMaxVolumeId(vid storage.VolumeId)
+
+ GetVolumeCount() int
+ GetActiveVolumeCount() int
+ GetMaxVolumeCount() int
+ GetMaxVolumeId() storage.VolumeId
+ SetParent(Node)
+ LinkChildNode(node Node)
+ UnlinkChildNode(nodeId NodeId)
+ CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64)
+
+ IsDataNode() bool
+ Children() map[NodeId]Node
+ Parent() Node
+
+ GetValue() interface{} //get reference to the topology,dc,rack,datanode
+}
+type NodeImpl struct {
+ id NodeId
+ volumeCount int
+ activeVolumeCount int
+ maxVolumeCount int
+ parent Node
+ children map[NodeId]Node
+ maxVolumeId storage.VolumeId
+
+ //for rack, data center, topology
+ nodeType string
+ value interface{}
+}
+
+func (n *NodeImpl) IsDataNode() bool {
+ return n.nodeType == "DataNode"
+}
+func (n *NodeImpl) IsRack() bool {
+ return n.nodeType == "Rack"
+}
+func (n *NodeImpl) IsDataCenter() bool {
+ return n.nodeType == "DataCenter"
+}
+func (n *NodeImpl) String() string {
+ if n.parent != nil {
+ return n.parent.String() + ":" + string(n.id)
+ }
+ return string(n.id)
+}
+func (n *NodeImpl) Id() NodeId {
+ return n.id
+}
+func (n *NodeImpl) FreeSpace() int {
+ return n.maxVolumeCount - n.volumeCount
+}
+func (n *NodeImpl) SetParent(node Node) {
+ n.parent = node
+}
+func (n *NodeImpl) Children() map[NodeId]Node {
+ return n.children
+}
+func (n *NodeImpl) Parent() Node {
+ return n.parent
+}
+func (n *NodeImpl) GetValue() interface{} {
+ return n.value
+}
+func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) {
+ ret := false
+ var assignedNode *DataNode
+ for _, node := range n.children {
+ freeSpace := node.FreeSpace()
+ //fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
+ if freeSpace <= 0 {
+ continue
+ }
+ if r >= freeSpace {
+ r -= freeSpace
+ } else {
+ if node.IsDataNode() && node.FreeSpace() > 0 {
+ //fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
+ return true, node.(*DataNode)
+ }
+ ret, assignedNode = node.ReserveOneVolume(r, vid)
+ if ret {
+ break
+ }
+ }
+ }
+ return ret, assignedNode
+}
+
+func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative
+ n.maxVolumeCount += maxVolumeCountDelta
+ if n.parent != nil {
+ n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta)
+ }
+}
+func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative
+ n.volumeCount += volumeCountDelta
+ if n.parent != nil {
+ n.parent.UpAdjustVolumeCountDelta(volumeCountDelta)
+ }
+}
+func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative
+ n.activeVolumeCount += activeVolumeCountDelta
+ if n.parent != nil {
+ n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta)
+ }
+}
+func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative
+ if n.maxVolumeId < vid {
+ n.maxVolumeId = vid
+ if n.parent != nil {
+ n.parent.UpAdjustMaxVolumeId(vid)
+ }
+ }
+}
+func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId {
+ return n.maxVolumeId
+}
+func (n *NodeImpl) GetVolumeCount() int {
+ return n.volumeCount
+}
+func (n *NodeImpl) GetActiveVolumeCount() int {
+ return n.activeVolumeCount
+}
+func (n *NodeImpl) GetMaxVolumeCount() int {
+ return n.maxVolumeCount
+}
+
+func (n *NodeImpl) LinkChildNode(node Node) {
+ if n.children[node.Id()] == nil {
+ n.children[node.Id()] = node
+ n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
+ n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
+ n.UpAdjustVolumeCountDelta(node.GetVolumeCount())
+ n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount())
+ node.SetParent(n)
+ fmt.Println(n, "adds child", node.Id())
+ }
+}
+
+func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
+ node := n.children[nodeId]
+ node.SetParent(nil)
+ if node != nil {
+ delete(n.children, node.Id())
+ n.UpAdjustVolumeCountDelta(-node.GetVolumeCount())
+ n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
+ n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount())
+ fmt.Println(n, "removes", node, "volumeCount =", n.activeVolumeCount)
+ }
+}
+
+func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) {
+ if n.IsRack() {
+ for _, c := range n.Children() {
+ dn := c.(*DataNode) //can not cast n to DataNode
+ if dn.LastSeen < freshThreshHold {
+ if !dn.Dead {
+ dn.Dead = true
+ n.GetTopology().chanDeadDataNodes <- dn
+ }
+ }
+ for _, v := range dn.volumes {
+ if uint64(v.Size) >= volumeSizeLimit {
+ //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
+ n.GetTopology().chanFullVolumes <- v
+ }
+ }
+ }
+ } else {
+ for _, c := range n.Children() {
+ c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit)
+ }
+ }
+}
+
+func (n *NodeImpl) GetTopology() *Topology {
+ var p Node
+ p = n
+ for p.Parent() != nil {
+ p = p.Parent()
+ }
+ return p.GetValue().(*Topology)
+}