diff options
Diffstat (limited to 'weed/topology/node.go')
| -rw-r--r-- | weed/topology/node.go | 75 |
1 files changed, 47 insertions, 28 deletions
diff --git a/weed/topology/node.go b/weed/topology/node.go index b7d2f79ec..b2808f589 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -5,26 +5,30 @@ import ( "math/rand" "strings" "sync" + "sync/atomic" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) type NodeId string type Node interface { Id() NodeId String() string - FreeSpace() int - ReserveOneVolume(r int) (*DataNode, error) - UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) - UpAdjustVolumeCountDelta(volumeCountDelta int) - UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) - UpAdjustMaxVolumeId(vid storage.VolumeId) + FreeSpace() int64 + ReserveOneVolume(r int64) (*DataNode, error) + UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) + UpAdjustVolumeCountDelta(volumeCountDelta int64) + UpAdjustEcShardCountDelta(ecShardCountDelta int64) + UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) + UpAdjustMaxVolumeId(vid needle.VolumeId) - GetVolumeCount() int - GetActiveVolumeCount() int - GetMaxVolumeCount() int - GetMaxVolumeId() storage.VolumeId + GetVolumeCount() int64 + GetEcShardCount() int64 + GetActiveVolumeCount() int64 + GetMaxVolumeCount() int64 + GetMaxVolumeId() needle.VolumeId SetParent(Node) LinkChildNode(node Node) UnlinkChildNode(nodeId NodeId) @@ -39,14 +43,15 @@ type Node interface { GetValue() interface{} //get reference to the topology,dc,rack,datanode } type NodeImpl struct { + volumeCount int64 + activeVolumeCount int64 + ecShardCount int64 + maxVolumeCount int64 id NodeId - volumeCount int - activeVolumeCount int - maxVolumeCount int parent Node sync.RWMutex // lock children children map[NodeId]Node - maxVolumeId storage.VolumeId + maxVolumeId needle.VolumeId //for rack, data center, topology nodeType string @@ -126,7 +131,10 @@ func (n *NodeImpl) String() string { func (n *NodeImpl) Id() NodeId { return n.id } -func (n *NodeImpl) FreeSpace() int { +func (n *NodeImpl) FreeSpace() int64 { + if n.ecShardCount > 0 { + return n.maxVolumeCount - n.volumeCount - n.ecShardCount/erasure_coding.DataShardsCount - 1 + } return n.maxVolumeCount - n.volumeCount } func (n *NodeImpl) SetParent(node Node) { @@ -146,7 +154,7 @@ func (n *NodeImpl) Parent() Node { func (n *NodeImpl) GetValue() interface{} { return n.value } -func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) { +func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) { n.RLock() defer n.RUnlock() for _, node := range n.children { @@ -171,25 +179,31 @@ func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) { return nil, errors.New("No free volume slot found!") } -func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative - n.maxVolumeCount += maxVolumeCountDelta +func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //can be negative + atomic.AddInt64(&n.maxVolumeCount, maxVolumeCountDelta) if n.parent != nil { n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) } } -func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative - n.volumeCount += volumeCountDelta +func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative + atomic.AddInt64(&n.volumeCount, volumeCountDelta) if n.parent != nil { n.parent.UpAdjustVolumeCountDelta(volumeCountDelta) } } -func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative - n.activeVolumeCount += activeVolumeCountDelta +func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative + atomic.AddInt64(&n.ecShardCount, ecShardCountDelta) + if n.parent != nil { + n.parent.UpAdjustEcShardCountDelta(ecShardCountDelta) + } +} +func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative + atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta) if n.parent != nil { n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta) } } -func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative +func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative if n.maxVolumeId < vid { n.maxVolumeId = vid if n.parent != nil { @@ -197,16 +211,19 @@ func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative } } } -func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId { +func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId { return n.maxVolumeId } -func (n *NodeImpl) GetVolumeCount() int { +func (n *NodeImpl) GetVolumeCount() int64 { return n.volumeCount } -func (n *NodeImpl) GetActiveVolumeCount() int { +func (n *NodeImpl) GetEcShardCount() int64 { + return n.ecShardCount +} +func (n *NodeImpl) GetActiveVolumeCount() int64 { return n.activeVolumeCount } -func (n *NodeImpl) GetMaxVolumeCount() int { +func (n *NodeImpl) GetMaxVolumeCount() int64 { return n.maxVolumeCount } @@ -218,6 +235,7 @@ func (n *NodeImpl) LinkChildNode(node Node) { n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount()) n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) n.UpAdjustVolumeCountDelta(node.GetVolumeCount()) + n.UpAdjustEcShardCountDelta(node.GetEcShardCount()) n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount()) node.SetParent(n) glog.V(0).Infoln(n, "adds child", node.Id()) @@ -232,6 +250,7 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { node.SetParent(nil) delete(n.children, node.Id()) n.UpAdjustVolumeCountDelta(-node.GetVolumeCount()) + n.UpAdjustEcShardCountDelta(-node.GetEcShardCount()) n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount()) n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount()) glog.V(0).Infoln(n, "removes", node.Id()) |
