aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/node.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-02-16 02:47:02 -0800
committerChris Lu <chris.lu@gmail.com>2021-02-16 02:47:02 -0800
commitf8446b42abd7f3c6c0a298dbbb8641e466891561 (patch)
tree84005ad6433f8f1d734624eba1e3c9166208f50f /weed/topology/node.go
parent71f0c195157b79223a3c8e35a57da10b7ff0720d (diff)
downloadseaweedfs-f8446b42abd7f3c6c0a298dbbb8641e466891561.tar.xz
seaweedfs-f8446b42abd7f3c6c0a298dbbb8641e466891561.zip
this can compile now!!!
Diffstat (limited to 'weed/topology/node.go')
-rw-r--r--weed/topology/node.go182
1 files changed, 44 insertions, 138 deletions
diff --git a/weed/topology/node.go b/weed/topology/node.go
index b5c2680dd..5275af64a 100644
--- a/weed/topology/node.go
+++ b/weed/topology/node.go
@@ -2,40 +2,25 @@ package topology
import (
"errors"
- "github.com/chrislusf/seaweedfs/weed/storage"
- "math/rand"
- "strings"
- "sync"
- "sync/atomic"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "math/rand"
+ "strings"
+ "sync"
)
type NodeId string
type Node interface {
Id() NodeId
String() string
- FreeSpace() int64
AvailableSpaceFor(option *VolumeGrowOption) int64
ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error)
- UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64)
- UpAdjustMaxSsdVolumeCountDelta(maxSsdVolumeCountDelta int64)
- UpAdjustVolumeCountDelta(volumeCountDelta int64)
- UpAdjustSsdVolumeCountDelta(ssdVolumeCountDelta int64)
- UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64)
- UpAdjustEcShardCountDelta(ecShardCountDelta int64)
- UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64)
+ UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages)
UpAdjustMaxVolumeId(vid needle.VolumeId)
+ GetDiskUsages() *DiskUsages
- GetVolumeCount() int64
- GetSsdVolumeCount() int64
- GetEcShardCount() int64
- GetActiveVolumeCount() int64
- GetRemoteVolumeCount() int64
- GetMaxVolumeCount() int64
- GetMaxSsdVolumeCount() int64
GetMaxVolumeId() needle.VolumeId
SetParent(Node)
LinkChildNode(node Node)
@@ -51,24 +36,22 @@ type Node interface {
GetValue() interface{} //get reference to the topology,dc,rack,datanode
}
type NodeImpl struct {
- volumeCount int64
- remoteVolumeCount int64
- ssdVolumeCount int64
- activeVolumeCount int64
- ecShardCount int64
- maxVolumeCount int64
- maxSsdVolumeCount int64
- id NodeId
- parent Node
- sync.RWMutex // lock children
- children map[NodeId]Node
- maxVolumeId needle.VolumeId
+ diskUsages *DiskUsages
+ id NodeId
+ parent Node
+ sync.RWMutex // lock children
+ children map[NodeId]Node
+ maxVolumeId needle.VolumeId
//for rack, data center, topology
nodeType string
value interface{}
}
+func (n *NodeImpl) GetDiskUsages() *DiskUsages {
+ return n.diskUsages
+}
+
// the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, option *VolumeGrowOption, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
var totalWeights int64
@@ -150,20 +133,14 @@ func (n *NodeImpl) String() string {
func (n *NodeImpl) Id() NodeId {
return n.id
}
-func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 {
- freeVolumeSlotCount := n.maxVolumeCount + n.remoteVolumeCount - n.volumeCount
- if option.DiskType == storage.SsdType {
- freeVolumeSlotCount = n.maxSsdVolumeCount - n.ssdVolumeCount
- }
- if n.ecShardCount > 0 {
- freeVolumeSlotCount = freeVolumeSlotCount - n.ecShardCount/erasure_coding.DataShardsCount - 1
- }
- return freeVolumeSlotCount
+func (n *NodeImpl) getOrCreateDisk(diskType types.DiskType) *DiskUsageCounts {
+ return n.diskUsages.getOrCreateDisk(diskType)
}
-func (n *NodeImpl) FreeSpace() int64 {
- freeVolumeSlotCount := n.maxVolumeCount + n.maxSsdVolumeCount + n.remoteVolumeCount - n.volumeCount - n.ssdVolumeCount
- if n.ecShardCount > 0 {
- freeVolumeSlotCount = freeVolumeSlotCount - n.ecShardCount/erasure_coding.DataShardsCount - 1
+func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 {
+ t := n.getOrCreateDisk(option.DiskType)
+ freeVolumeSlotCount := t.maxVolumeCount + t.remoteVolumeCount - t.volumeCount
+ if t.ecShardCount > 0 {
+ freeVolumeSlotCount = freeVolumeSlotCount - t.ecShardCount/erasure_coding.DataShardsCount - 1
}
return freeVolumeSlotCount
}
@@ -209,67 +186,29 @@ func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assigned
return nil, errors.New("No free volume slot found!")
}
-func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //can be negative
- if maxVolumeCountDelta == 0 {
- return
- }
- atomic.AddInt64(&n.maxVolumeCount, maxVolumeCountDelta)
- if n.parent != nil {
- n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta)
- }
-}
-func (n *NodeImpl) UpAdjustMaxSsdVolumeCountDelta(maxSsdVolumeCountDelta int64) { //can be negative
- if maxSsdVolumeCountDelta == 0 {
- return
- }
- atomic.AddInt64(&n.maxSsdVolumeCount, maxSsdVolumeCountDelta)
- if n.parent != nil {
- n.parent.UpAdjustMaxSsdVolumeCountDelta(maxSsdVolumeCountDelta)
- }
-}
-func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative
- if volumeCountDelta == 0 {
- return
- }
- atomic.AddInt64(&n.volumeCount, volumeCountDelta)
- if n.parent != nil {
- n.parent.UpAdjustVolumeCountDelta(volumeCountDelta)
- }
-}
-func (n *NodeImpl) UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) { //can be negative
- if remoteVolumeCountDelta == 0 {
- return
- }
- atomic.AddInt64(&n.remoteVolumeCount, remoteVolumeCountDelta)
- if n.parent != nil {
- n.parent.UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta)
- }
-}
-func (n *NodeImpl) UpAdjustSsdVolumeCountDelta(ssdVolumeCountDelta int64) { //can be negative
- if ssdVolumeCountDelta == 0 {
- return
- }
- atomic.AddInt64(&n.ssdVolumeCount, ssdVolumeCountDelta)
- if n.parent != nil {
- n.parent.UpAdjustSsdVolumeCountDelta(ssdVolumeCountDelta)
- }
-}
-func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative
- if ecShardCountDelta == 0 {
- return
- }
- atomic.AddInt64(&n.ecShardCount, ecShardCountDelta)
- if n.parent != nil {
- n.parent.UpAdjustEcShardCountDelta(ecShardCountDelta)
+func (n *NodeImpl) AdjustMaxVolumeCounts(maxVolumeCounts map[string]uint32) {
+ deltaDiskUsages := newDiskUsages()
+ for diskType, maxVolumeCount := range maxVolumeCounts {
+ if maxVolumeCount == 0 {
+ // the volume server may have set the max to zero
+ continue
+ }
+ dt := types.DiskType(diskType)
+ deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(dt)
+ currentDiskUsage := n.diskUsages.getOrCreateDisk(dt)
+ deltaDiskUsage.maxVolumeCount = int64(maxVolumeCount) - currentDiskUsage.maxVolumeCount
+ deltaDiskUsages.getOrCreateDisk(dt).maxVolumeCount = int64(maxVolumeCount)
}
+ n.UpAdjustDiskUsageDelta(deltaDiskUsages)
}
-func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative
- if activeVolumeCountDelta == 0 {
- return
+
+func (n *NodeImpl) UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages) { //can be negative
+ for diskType, diskUsage := range deltaDiskUsages.usages {
+ existingDisk := n.getOrCreateDisk(diskType)
+ existingDisk.addDiskUsageCounts(diskUsage)
}
- atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta)
if n.parent != nil {
- n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta)
+ n.parent.UpAdjustDiskUsageDelta(deltaDiskUsages)
}
}
func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative
@@ -283,41 +222,14 @@ func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative
func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId {
return n.maxVolumeId
}
-func (n *NodeImpl) GetVolumeCount() int64 {
- return n.volumeCount
-}
-func (n *NodeImpl) GetSsdVolumeCount() int64 {
- return n.ssdVolumeCount
-}
-func (n *NodeImpl) GetEcShardCount() int64 {
- return n.ecShardCount
-}
-func (n *NodeImpl) GetRemoteVolumeCount() int64 {
- return n.remoteVolumeCount
-}
-func (n *NodeImpl) GetActiveVolumeCount() int64 {
- return n.activeVolumeCount
-}
-func (n *NodeImpl) GetMaxVolumeCount() int64 {
- return n.maxVolumeCount
-}
-func (n *NodeImpl) GetMaxSsdVolumeCount() int64 {
- return n.maxSsdVolumeCount
-}
func (n *NodeImpl) LinkChildNode(node Node) {
n.Lock()
defer n.Unlock()
if n.children[node.Id()] == nil {
n.children[node.Id()] = node
- n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
- n.UpAdjustMaxSsdVolumeCountDelta(node.GetMaxSsdVolumeCount())
+ n.UpAdjustDiskUsageDelta(node.GetDiskUsages())
n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
- n.UpAdjustVolumeCountDelta(node.GetVolumeCount())
- n.UpAdjustSsdVolumeCountDelta(node.GetSsdVolumeCount())
- n.UpAdjustRemoteVolumeCountDelta(node.GetRemoteVolumeCount())
- n.UpAdjustEcShardCountDelta(node.GetEcShardCount())
- n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount())
node.SetParent(n)
glog.V(0).Infoln(n, "adds child", node.Id())
}
@@ -330,13 +242,7 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
if node != nil {
node.SetParent(nil)
delete(n.children, node.Id())
- n.UpAdjustVolumeCountDelta(-node.GetVolumeCount())
- n.UpAdjustSsdVolumeCountDelta(-node.GetSsdVolumeCount())
- n.UpAdjustRemoteVolumeCountDelta(-node.GetRemoteVolumeCount())
- n.UpAdjustEcShardCountDelta(-node.GetEcShardCount())
- n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
- n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount())
- n.UpAdjustMaxSsdVolumeCountDelta(-node.GetMaxSsdVolumeCount())
+ n.UpAdjustDiskUsageDelta(node.GetDiskUsages().negative())
glog.V(0).Infoln(n, "removes", node.Id())
}
}