aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/node.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology/node.go')
-rw-r--r--weed/topology/node.go205
1 files changed, 84 insertions, 121 deletions
diff --git a/weed/topology/node.go b/weed/topology/node.go
index 572a89d4d..95d63972e 100644
--- a/weed/topology/node.go
+++ b/weed/topology/node.go
@@ -2,34 +2,25 @@ package topology
import (
"errors"
- "math/rand"
- "strings"
- "sync"
- "sync/atomic"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "math/rand"
+ "strings"
+ "sync"
)
type NodeId string
type Node interface {
Id() NodeId
String() string
- FreeSpace() int64
- ReserveOneVolume(r int64) (*DataNode, error)
- UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64)
- UpAdjustVolumeCountDelta(volumeCountDelta int64)
- UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64)
- UpAdjustEcShardCountDelta(ecShardCountDelta int64)
- UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64)
+ AvailableSpaceFor(option *VolumeGrowOption) int64
+ ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error)
+ UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages)
UpAdjustMaxVolumeId(vid needle.VolumeId)
+ GetDiskUsages() *DiskUsages
- GetVolumeCount() int64
- GetEcShardCount() int64
- GetActiveVolumeCount() int64
- GetRemoteVolumeCount() int64
- GetMaxVolumeCount() int64
GetMaxVolumeId() needle.VolumeId
SetParent(Node)
LinkChildNode(node Node)
@@ -45,73 +36,81 @@ type Node interface {
GetValue() interface{} //get reference to the topology,dc,rack,datanode
}
type NodeImpl struct {
- volumeCount int64
- remoteVolumeCount int64
- activeVolumeCount int64
- ecShardCount int64
- maxVolumeCount int64
- id NodeId
- parent Node
- sync.RWMutex // lock children
- children map[NodeId]Node
- maxVolumeId needle.VolumeId
+ diskUsages *DiskUsages
+ id NodeId
+ parent Node
+ sync.RWMutex // lock children
+ children map[NodeId]Node
+ maxVolumeId needle.VolumeId
//for rack, data center, topology
nodeType string
value interface{}
}
+func (n *NodeImpl) GetDiskUsages() *DiskUsages {
+ return n.diskUsages
+}
+
// the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
-func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
- candidates := make([]Node, 0, len(n.children))
+func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, option *VolumeGrowOption, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
+ var totalWeights int64
var errs []string
n.RLock()
+ candidates := make([]Node, 0, len(n.children))
+ candidatesWeights := make([]int64, 0, len(n.children))
+ //pick nodes which has enough free volumes as candidates, and use free volumes number as node weight.
for _, node := range n.children {
- if err := filterFirstNodeFn(node); err == nil {
- candidates = append(candidates, node)
- } else {
- errs = append(errs, string(node.Id())+":"+err.Error())
+ if node.AvailableSpaceFor(option) <= 0 {
+ continue
}
+ totalWeights += node.AvailableSpaceFor(option)
+ candidates = append(candidates, node)
+ candidatesWeights = append(candidatesWeights, node.AvailableSpaceFor(option))
}
n.RUnlock()
- if len(candidates) == 0 {
- return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
+ if len(candidates) < numberOfNodes {
+ glog.V(0).Infoln(n.Id(), "failed to pick", numberOfNodes, "from ", len(candidates), "node candidates")
+ return nil, nil, errors.New("No enough data node found!")
}
- firstNode = candidates[rand.Intn(len(candidates))]
- glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id())
- restNodes = make([]Node, numberOfNodes-1)
- candidates = candidates[:0]
- n.RLock()
- for _, node := range n.children {
- if node.Id() == firstNode.Id() {
- continue
- }
- if node.FreeSpace() <= 0 {
- continue
+ //pick nodes randomly by weights, the node picked earlier has higher final weights
+ sortedCandidates := make([]Node, 0, len(candidates))
+ for i := 0; i < len(candidates); i++ {
+ weightsInterval := rand.Int63n(totalWeights)
+ lastWeights := int64(0)
+ for k, weights := range candidatesWeights {
+ if (weightsInterval >= lastWeights) && (weightsInterval < lastWeights+weights) {
+ sortedCandidates = append(sortedCandidates, candidates[k])
+ candidatesWeights[k] = 0
+ totalWeights -= weights
+ break
+ }
+ lastWeights += weights
}
- glog.V(2).Infoln("select rest node candidate:", node.Id())
- candidates = append(candidates, node)
}
- n.RUnlock()
- glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates")
- ret := len(restNodes) == 0
- for k, node := range candidates {
- if k < len(restNodes) {
- restNodes[k] = node
- if k == len(restNodes)-1 {
- ret = true
+
+ restNodes = make([]Node, 0, numberOfNodes-1)
+ ret := false
+ n.RLock()
+ for k, node := range sortedCandidates {
+ if err := filterFirstNodeFn(node); err == nil {
+ firstNode = node
+ if k >= numberOfNodes-1 {
+ restNodes = sortedCandidates[:numberOfNodes-1]
+ } else {
+ restNodes = append(restNodes, sortedCandidates[:k]...)
+ restNodes = append(restNodes, sortedCandidates[k+1:numberOfNodes]...)
}
+ ret = true
+ break
} else {
- r := rand.Intn(k + 1)
- if r < len(restNodes) {
- restNodes[r] = node
- }
+ errs = append(errs, string(node.Id())+":"+err.Error())
}
}
+ n.RUnlock()
if !ret {
- glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates")
- err = errors.New("No enough data node found!")
+ return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
}
return
}
@@ -134,10 +133,14 @@ func (n *NodeImpl) String() string {
func (n *NodeImpl) Id() NodeId {
return n.id
}
-func (n *NodeImpl) FreeSpace() int64 {
- freeVolumeSlotCount := n.maxVolumeCount + n.remoteVolumeCount - n.volumeCount
- if n.ecShardCount > 0 {
- freeVolumeSlotCount = freeVolumeSlotCount - n.ecShardCount/erasure_coding.DataShardsCount - 1
+func (n *NodeImpl) getOrCreateDisk(diskType types.DiskType) *DiskUsageCounts {
+ return n.diskUsages.getOrCreateDisk(diskType)
+}
+func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 {
+ t := n.getOrCreateDisk(option.DiskType)
+ freeVolumeSlotCount := t.maxVolumeCount + t.remoteVolumeCount - t.volumeCount
+ if t.ecShardCount > 0 {
+ freeVolumeSlotCount = freeVolumeSlotCount - t.ecShardCount/erasure_coding.DataShardsCount - 1
}
return freeVolumeSlotCount
}
@@ -158,11 +161,11 @@ func (n *NodeImpl) Parent() Node {
func (n *NodeImpl) GetValue() interface{} {
return n.value
}
-func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) {
+func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) {
n.RLock()
defer n.RUnlock()
for _, node := range n.children {
- freeSpace := node.FreeSpace()
+ freeSpace := node.AvailableSpaceFor(option)
// fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
if freeSpace <= 0 {
continue
@@ -170,11 +173,11 @@ func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error)
if r >= freeSpace {
r -= freeSpace
} else {
- if node.IsDataNode() && node.FreeSpace() > 0 {
+ if node.IsDataNode() && node.AvailableSpaceFor(option) > 0 {
// fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
return node.(*DataNode), nil
}
- assignedNode, err = node.ReserveOneVolume(r)
+ assignedNode, err = node.ReserveOneVolume(r, option)
if err == nil {
return
}
@@ -183,34 +186,13 @@ func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error)
return nil, errors.New("No free volume slot found!")
}
-func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //can be negative
- atomic.AddInt64(&n.maxVolumeCount, maxVolumeCountDelta)
- if n.parent != nil {
- n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta)
- }
-}
-func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative
- atomic.AddInt64(&n.volumeCount, volumeCountDelta)
- if n.parent != nil {
- n.parent.UpAdjustVolumeCountDelta(volumeCountDelta)
+func (n *NodeImpl) UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages) { //can be negative
+ for diskType, diskUsage := range deltaDiskUsages.usages {
+ existingDisk := n.getOrCreateDisk(diskType)
+ existingDisk.addDiskUsageCounts(diskUsage)
}
-}
-func (n *NodeImpl) UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) { //can be negative
- atomic.AddInt64(&n.remoteVolumeCount, remoteVolumeCountDelta)
if n.parent != nil {
- n.parent.UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta)
- }
-}
-func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative
- atomic.AddInt64(&n.ecShardCount, ecShardCountDelta)
- if n.parent != nil {
- n.parent.UpAdjustEcShardCountDelta(ecShardCountDelta)
- }
-}
-func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative
- atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta)
- if n.parent != nil {
- n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta)
+ n.parent.UpAdjustDiskUsageDelta(deltaDiskUsages)
}
}
func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative
@@ -224,33 +206,18 @@ func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative
func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId {
return n.maxVolumeId
}
-func (n *NodeImpl) GetVolumeCount() int64 {
- return n.volumeCount
-}
-func (n *NodeImpl) GetEcShardCount() int64 {
- return n.ecShardCount
-}
-func (n *NodeImpl) GetRemoteVolumeCount() int64 {
- return n.remoteVolumeCount
-}
-func (n *NodeImpl) GetActiveVolumeCount() int64 {
- return n.activeVolumeCount
-}
-func (n *NodeImpl) GetMaxVolumeCount() int64 {
- return n.maxVolumeCount
-}
func (n *NodeImpl) LinkChildNode(node Node) {
n.Lock()
defer n.Unlock()
+ n.doLinkChildNode(node)
+}
+
+func (n *NodeImpl) doLinkChildNode(node Node) {
if n.children[node.Id()] == nil {
n.children[node.Id()] = node
- n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
+ n.UpAdjustDiskUsageDelta(node.GetDiskUsages())
n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
- n.UpAdjustVolumeCountDelta(node.GetVolumeCount())
- n.UpAdjustRemoteVolumeCountDelta(node.GetRemoteVolumeCount())
- n.UpAdjustEcShardCountDelta(node.GetEcShardCount())
- n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount())
node.SetParent(n)
glog.V(0).Infoln(n, "adds child", node.Id())
}
@@ -263,11 +230,7 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
if node != nil {
node.SetParent(nil)
delete(n.children, node.Id())
- n.UpAdjustVolumeCountDelta(-node.GetVolumeCount())
- n.UpAdjustRemoteVolumeCountDelta(-node.GetRemoteVolumeCount())
- n.UpAdjustEcShardCountDelta(-node.GetEcShardCount())
- n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
- n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount())
+ n.UpAdjustDiskUsageDelta(node.GetDiskUsages().negative())
glog.V(0).Infoln(n, "removes", node.Id())
}
}