diff options
Diffstat (limited to 'weed/topology/node.go')
| -rw-r--r-- | weed/topology/node.go | 66 |
1 files changed, 56 insertions, 10 deletions
diff --git a/weed/topology/node.go b/weed/topology/node.go index 114417edf..b5c2680dd 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -2,6 +2,7 @@ package topology import ( "errors" + "github.com/chrislusf/seaweedfs/weed/storage" "math/rand" "strings" "sync" @@ -17,19 +18,24 @@ type Node interface { Id() NodeId String() string FreeSpace() int64 - ReserveOneVolume(r int64) (*DataNode, error) + AvailableSpaceFor(option *VolumeGrowOption) int64 + ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) + UpAdjustMaxSsdVolumeCountDelta(maxSsdVolumeCountDelta int64) UpAdjustVolumeCountDelta(volumeCountDelta int64) + UpAdjustSsdVolumeCountDelta(ssdVolumeCountDelta int64) UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) UpAdjustEcShardCountDelta(ecShardCountDelta int64) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) UpAdjustMaxVolumeId(vid needle.VolumeId) GetVolumeCount() int64 + GetSsdVolumeCount() int64 GetEcShardCount() int64 GetActiveVolumeCount() int64 GetRemoteVolumeCount() int64 GetMaxVolumeCount() int64 + GetMaxSsdVolumeCount() int64 GetMaxVolumeId() needle.VolumeId SetParent(Node) LinkChildNode(node Node) @@ -47,9 +53,11 @@ type Node interface { type NodeImpl struct { volumeCount int64 remoteVolumeCount int64 + ssdVolumeCount int64 activeVolumeCount int64 ecShardCount int64 maxVolumeCount int64 + maxSsdVolumeCount int64 id NodeId parent Node sync.RWMutex // lock children @@ -62,7 +70,7 @@ type NodeImpl struct { } // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot -func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) { +func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, option *VolumeGrowOption, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) { var totalWeights int64 var errs []string n.RLock() @@ -70,12 +78,12 @@ func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, filterFirstNodeFn func(d candidatesWeights := make([]int64, 0, len(n.children)) //pick nodes which has enough free volumes as candidates, and use free volumes number as node weight. for _, node := range n.children { - if node.FreeSpace() <= 0 { + if node.AvailableSpaceFor(option) <= 0 { continue } - totalWeights += node.FreeSpace() + totalWeights += node.AvailableSpaceFor(option) candidates = append(candidates, node) - candidatesWeights = append(candidatesWeights, node.FreeSpace()) + candidatesWeights = append(candidatesWeights, node.AvailableSpaceFor(option)) } n.RUnlock() if len(candidates) < numberOfNodes { @@ -142,8 +150,18 @@ func (n *NodeImpl) String() string { func (n *NodeImpl) Id() NodeId { return n.id } -func (n *NodeImpl) FreeSpace() int64 { +func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 { freeVolumeSlotCount := n.maxVolumeCount + n.remoteVolumeCount - n.volumeCount + if option.DiskType == storage.SsdType { + freeVolumeSlotCount = n.maxSsdVolumeCount - n.ssdVolumeCount + } + if n.ecShardCount > 0 { + freeVolumeSlotCount = freeVolumeSlotCount - n.ecShardCount/erasure_coding.DataShardsCount - 1 + } + return freeVolumeSlotCount +} +func (n *NodeImpl) FreeSpace() int64 { + freeVolumeSlotCount := n.maxVolumeCount + n.maxSsdVolumeCount + n.remoteVolumeCount - n.volumeCount - n.ssdVolumeCount if n.ecShardCount > 0 { freeVolumeSlotCount = freeVolumeSlotCount - n.ecShardCount/erasure_coding.DataShardsCount - 1 } @@ -166,11 +184,11 @@ func (n *NodeImpl) Parent() Node { func (n *NodeImpl) GetValue() interface{} { return n.value } -func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) { +func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) { n.RLock() defer n.RUnlock() for _, node := range n.children { - freeSpace := node.FreeSpace() + freeSpace := node.AvailableSpaceFor(option) // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) if freeSpace <= 0 { continue @@ -178,11 +196,11 @@ func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) if r >= freeSpace { r -= freeSpace } else { - if node.IsDataNode() && node.FreeSpace() > 0 { + if node.IsDataNode() && node.AvailableSpaceFor(option) > 0 { // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) return node.(*DataNode), nil } - assignedNode, err = node.ReserveOneVolume(r) + assignedNode, err = node.ReserveOneVolume(r, option) if err == nil { return } @@ -200,6 +218,15 @@ func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //ca n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) } } +func (n *NodeImpl) UpAdjustMaxSsdVolumeCountDelta(maxSsdVolumeCountDelta int64) { //can be negative + if maxSsdVolumeCountDelta == 0 { + return + } + atomic.AddInt64(&n.maxSsdVolumeCount, maxSsdVolumeCountDelta) + if n.parent != nil { + n.parent.UpAdjustMaxSsdVolumeCountDelta(maxSsdVolumeCountDelta) + } +} func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative if volumeCountDelta == 0 { return @@ -218,6 +245,15 @@ func (n *NodeImpl) UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) n.parent.UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta) } } +func (n *NodeImpl) UpAdjustSsdVolumeCountDelta(ssdVolumeCountDelta int64) { //can be negative + if ssdVolumeCountDelta == 0 { + return + } + atomic.AddInt64(&n.ssdVolumeCount, ssdVolumeCountDelta) + if n.parent != nil { + n.parent.UpAdjustSsdVolumeCountDelta(ssdVolumeCountDelta) + } +} func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative if ecShardCountDelta == 0 { return @@ -250,6 +286,9 @@ func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId { func (n *NodeImpl) GetVolumeCount() int64 { return n.volumeCount } +func (n *NodeImpl) GetSsdVolumeCount() int64 { + return n.ssdVolumeCount +} func (n *NodeImpl) GetEcShardCount() int64 { return n.ecShardCount } @@ -262,6 +301,9 @@ func (n *NodeImpl) GetActiveVolumeCount() int64 { func (n *NodeImpl) GetMaxVolumeCount() int64 { return n.maxVolumeCount } +func (n *NodeImpl) GetMaxSsdVolumeCount() int64 { + return n.maxSsdVolumeCount +} func (n *NodeImpl) LinkChildNode(node Node) { n.Lock() @@ -269,8 +311,10 @@ func (n *NodeImpl) LinkChildNode(node Node) { if n.children[node.Id()] == nil { n.children[node.Id()] = node n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount()) + n.UpAdjustMaxSsdVolumeCountDelta(node.GetMaxSsdVolumeCount()) n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) n.UpAdjustVolumeCountDelta(node.GetVolumeCount()) + n.UpAdjustSsdVolumeCountDelta(node.GetSsdVolumeCount()) n.UpAdjustRemoteVolumeCountDelta(node.GetRemoteVolumeCount()) n.UpAdjustEcShardCountDelta(node.GetEcShardCount()) n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount()) @@ -287,10 +331,12 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { node.SetParent(nil) delete(n.children, node.Id()) n.UpAdjustVolumeCountDelta(-node.GetVolumeCount()) + n.UpAdjustSsdVolumeCountDelta(-node.GetSsdVolumeCount()) n.UpAdjustRemoteVolumeCountDelta(-node.GetRemoteVolumeCount()) n.UpAdjustEcShardCountDelta(-node.GetEcShardCount()) n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount()) n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount()) + n.UpAdjustMaxSsdVolumeCountDelta(-node.GetMaxSsdVolumeCount()) glog.V(0).Infoln(n, "removes", node.Id()) } } |
