diff options
Diffstat (limited to 'weed/topology/node.go')
| -rw-r--r-- | weed/topology/node.go | 150 |
1 files changed, 45 insertions, 105 deletions
diff --git a/weed/topology/node.go b/weed/topology/node.go index 114417edf..95d63972e 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -2,34 +2,25 @@ package topology import ( "errors" - "math/rand" - "strings" - "sync" - "sync/atomic" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "math/rand" + "strings" + "sync" ) type NodeId string type Node interface { Id() NodeId String() string - FreeSpace() int64 - ReserveOneVolume(r int64) (*DataNode, error) - UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) - UpAdjustVolumeCountDelta(volumeCountDelta int64) - UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) - UpAdjustEcShardCountDelta(ecShardCountDelta int64) - UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) + AvailableSpaceFor(option *VolumeGrowOption) int64 + ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error) + UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages) UpAdjustMaxVolumeId(vid needle.VolumeId) + GetDiskUsages() *DiskUsages - GetVolumeCount() int64 - GetEcShardCount() int64 - GetActiveVolumeCount() int64 - GetRemoteVolumeCount() int64 - GetMaxVolumeCount() int64 GetMaxVolumeId() needle.VolumeId SetParent(Node) LinkChildNode(node Node) @@ -45,24 +36,24 @@ type Node interface { GetValue() interface{} //get reference to the topology,dc,rack,datanode } type NodeImpl struct { - volumeCount int64 - remoteVolumeCount int64 - activeVolumeCount int64 - ecShardCount int64 - maxVolumeCount int64 - id NodeId - parent Node - sync.RWMutex // lock children - children map[NodeId]Node - maxVolumeId needle.VolumeId + diskUsages *DiskUsages + id NodeId + parent Node + sync.RWMutex // lock children + children map[NodeId]Node + maxVolumeId needle.VolumeId //for rack, data center, topology nodeType string value interface{} } +func (n *NodeImpl) GetDiskUsages() *DiskUsages { + return n.diskUsages +} + // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot -func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) { +func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, option *VolumeGrowOption, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) { var totalWeights int64 var errs []string n.RLock() @@ -70,12 +61,12 @@ func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, filterFirstNodeFn func(d candidatesWeights := make([]int64, 0, len(n.children)) //pick nodes which has enough free volumes as candidates, and use free volumes number as node weight. for _, node := range n.children { - if node.FreeSpace() <= 0 { + if node.AvailableSpaceFor(option) <= 0 { continue } - totalWeights += node.FreeSpace() + totalWeights += node.AvailableSpaceFor(option) candidates = append(candidates, node) - candidatesWeights = append(candidatesWeights, node.FreeSpace()) + candidatesWeights = append(candidatesWeights, node.AvailableSpaceFor(option)) } n.RUnlock() if len(candidates) < numberOfNodes { @@ -142,10 +133,14 @@ func (n *NodeImpl) String() string { func (n *NodeImpl) Id() NodeId { return n.id } -func (n *NodeImpl) FreeSpace() int64 { - freeVolumeSlotCount := n.maxVolumeCount + n.remoteVolumeCount - n.volumeCount - if n.ecShardCount > 0 { - freeVolumeSlotCount = freeVolumeSlotCount - n.ecShardCount/erasure_coding.DataShardsCount - 1 +func (n *NodeImpl) getOrCreateDisk(diskType types.DiskType) *DiskUsageCounts { + return n.diskUsages.getOrCreateDisk(diskType) +} +func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 { + t := n.getOrCreateDisk(option.DiskType) + freeVolumeSlotCount := t.maxVolumeCount + t.remoteVolumeCount - t.volumeCount + if t.ecShardCount > 0 { + freeVolumeSlotCount = freeVolumeSlotCount - t.ecShardCount/erasure_coding.DataShardsCount - 1 } return freeVolumeSlotCount } @@ -166,11 +161,11 @@ func (n *NodeImpl) Parent() Node { func (n *NodeImpl) GetValue() interface{} { return n.value } -func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) { +func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) { n.RLock() defer n.RUnlock() for _, node := range n.children { - freeSpace := node.FreeSpace() + freeSpace := node.AvailableSpaceFor(option) // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) if freeSpace <= 0 { continue @@ -178,11 +173,11 @@ func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) if r >= freeSpace { r -= freeSpace } else { - if node.IsDataNode() && node.FreeSpace() > 0 { + if node.IsDataNode() && node.AvailableSpaceFor(option) > 0 { // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) return node.(*DataNode), nil } - assignedNode, err = node.ReserveOneVolume(r) + assignedNode, err = node.ReserveOneVolume(r, option) if err == nil { return } @@ -191,49 +186,13 @@ func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) return nil, errors.New("No free volume slot found!") } -func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //can be negative - if maxVolumeCountDelta == 0 { - return - } - atomic.AddInt64(&n.maxVolumeCount, maxVolumeCountDelta) - if n.parent != nil { - n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) - } -} -func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative - if volumeCountDelta == 0 { - return +func (n *NodeImpl) UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages) { //can be negative + for diskType, diskUsage := range deltaDiskUsages.usages { + existingDisk := n.getOrCreateDisk(diskType) + existingDisk.addDiskUsageCounts(diskUsage) } - atomic.AddInt64(&n.volumeCount, volumeCountDelta) if n.parent != nil { - n.parent.UpAdjustVolumeCountDelta(volumeCountDelta) - } -} -func (n *NodeImpl) UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) { //can be negative - if remoteVolumeCountDelta == 0 { - return - } - atomic.AddInt64(&n.remoteVolumeCount, remoteVolumeCountDelta) - if n.parent != nil { - n.parent.UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta) - } -} -func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative - if ecShardCountDelta == 0 { - return - } - atomic.AddInt64(&n.ecShardCount, ecShardCountDelta) - if n.parent != nil { - n.parent.UpAdjustEcShardCountDelta(ecShardCountDelta) - } -} -func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative - if activeVolumeCountDelta == 0 { - return - } - atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta) - if n.parent != nil { - n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta) + n.parent.UpAdjustDiskUsageDelta(deltaDiskUsages) } } func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative @@ -247,33 +206,18 @@ func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId { return n.maxVolumeId } -func (n *NodeImpl) GetVolumeCount() int64 { - return n.volumeCount -} -func (n *NodeImpl) GetEcShardCount() int64 { - return n.ecShardCount -} -func (n *NodeImpl) GetRemoteVolumeCount() int64 { - return n.remoteVolumeCount -} -func (n *NodeImpl) GetActiveVolumeCount() int64 { - return n.activeVolumeCount -} -func (n *NodeImpl) GetMaxVolumeCount() int64 { - return n.maxVolumeCount -} func (n *NodeImpl) LinkChildNode(node Node) { n.Lock() defer n.Unlock() + n.doLinkChildNode(node) +} + +func (n *NodeImpl) doLinkChildNode(node Node) { if n.children[node.Id()] == nil { n.children[node.Id()] = node - n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount()) + n.UpAdjustDiskUsageDelta(node.GetDiskUsages()) n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) - n.UpAdjustVolumeCountDelta(node.GetVolumeCount()) - n.UpAdjustRemoteVolumeCountDelta(node.GetRemoteVolumeCount()) - n.UpAdjustEcShardCountDelta(node.GetEcShardCount()) - n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount()) node.SetParent(n) glog.V(0).Infoln(n, "adds child", node.Id()) } @@ -286,11 +230,7 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { if node != nil { node.SetParent(nil) delete(n.children, node.Id()) - n.UpAdjustVolumeCountDelta(-node.GetVolumeCount()) - n.UpAdjustRemoteVolumeCountDelta(-node.GetRemoteVolumeCount()) - n.UpAdjustEcShardCountDelta(-node.GetEcShardCount()) - n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount()) - n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount()) + n.UpAdjustDiskUsageDelta(node.GetDiskUsages().negative()) glog.V(0).Infoln(n, "removes", node.Id()) } } |
