aboutsummaryrefslogtreecommitdiff
path: root/go/topology/node.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2014-03-02 22:16:54 -0800
committerChris Lu <chris.lu@gmail.com>2014-03-02 22:16:54 -0800
commit27c74a7e66558a4f9ce0d10621606dfed98a3abb (patch)
treef16eef19480fd51ccbef54c05d39c2eacf309e56 /go/topology/node.go
parentedae676913363bdd1e5a50bf0778fdcc3c6d6051 (diff)
downloadseaweedfs-27c74a7e66558a4f9ce0d10621606dfed98a3abb.tar.xz
seaweedfs-27c74a7e66558a4f9ce0d10621606dfed98a3abb.zip
Major:
change replication_type to ReplicaPlacement, hopefully cleaner code works for 9 possible ReplicaPlacement xyz x : number of copies on other data centers y : number of copies on other racks z : number of copies on current rack x y z each can be 0,1,2 Minor: weed server "-mdir" default to "-dir" if empty
Diffstat (limited to 'go/topology/node.go')
-rw-r--r--go/topology/node.go69
1 files changed, 57 insertions, 12 deletions
diff --git a/go/topology/node.go b/go/topology/node.go
index cfd6f6489..abe363b39 100644
--- a/go/topology/node.go
+++ b/go/topology/node.go
@@ -3,6 +3,8 @@ package topology
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/storage"
+ "errors"
+ "math/rand"
)
type NodeId string
@@ -10,7 +12,7 @@ type Node interface {
Id() NodeId
String() string
FreeSpace() int
- ReserveOneVolume(r int, vid storage.VolumeId, dataCenter string) (bool, *DataNode)
+ ReserveOneVolume(r int) (*DataNode, error)
UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int)
UpAdjustVolumeCountDelta(volumeCountDelta int)
UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int)
@@ -47,6 +49,54 @@ type NodeImpl struct {
value interface{}
}
+// the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
+func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) bool) (firstNode Node, restNodes []Node, err error) {
+ candidates := make([]Node, 0, len(n.children))
+ for _, node := range n.children {
+ if filterFirstNodeFn(node) {
+ candidates = append(candidates, node)
+ }
+ }
+ if len(candidates) == 0 {
+ return nil, nil, errors.New("No matching data node found!")
+ }
+ firstNode = candidates[rand.Intn(len(candidates))]
+ glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id())
+
+ restNodes = make([]Node, numberOfNodes-1)
+ candidates = candidates[:0]
+ for _, node := range n.children {
+ if node.Id() == firstNode.Id() {
+ continue
+ }
+ if node.FreeSpace() <= 0 {
+ continue
+ }
+ glog.V(2).Infoln("select rest node candidate:", node.Id())
+ candidates = append(candidates, node)
+ }
+ glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates")
+ ret := len(restNodes) == 0
+ for k, node := range candidates {
+ if k < len(restNodes) {
+ restNodes[k] = node
+ if k == len(restNodes)-1 {
+ ret = true
+ }
+ } else {
+ r := rand.Intn(k + 1)
+ if r < len(restNodes) {
+ restNodes[r] = node
+ }
+ }
+ }
+ if !ret {
+ glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates")
+ err = errors.New("Not enough data node found!")
+ }
+ return
+}
+
func (n *NodeImpl) IsDataNode() bool {
return n.nodeType == "DataNode"
}
@@ -80,32 +130,27 @@ func (n *NodeImpl) Parent() Node {
func (n *NodeImpl) GetValue() interface{} {
return n.value
}
-func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId, dataCenter string) (bool, *DataNode) {
- ret := false
- var assignedNode *DataNode
+func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) {
for _, node := range n.children {
freeSpace := node.FreeSpace()
// fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
if freeSpace <= 0 {
continue
}
- if dataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(dataCenter) {
- continue
- }
if r >= freeSpace {
r -= freeSpace
} else {
if node.IsDataNode() && node.FreeSpace() > 0 {
// fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
- return true, node.(*DataNode)
+ return node.(*DataNode), nil
}
- ret, assignedNode = node.ReserveOneVolume(r, vid, dataCenter)
- if ret {
- break
+ assignedNode, err = node.ReserveOneVolume(r)
+ if err != nil {
+ return
}
}
}
- return ret, assignedNode
+ return
}
func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative