diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-02-16 02:47:02 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-02-16 02:47:02 -0800 |
| commit | f8446b42abd7f3c6c0a298dbbb8641e466891561 (patch) | |
| tree | 84005ad6433f8f1d734624eba1e3c9166208f50f /weed/topology/node.go | |
| parent | 71f0c195157b79223a3c8e35a57da10b7ff0720d (diff) | |
| download | seaweedfs-f8446b42abd7f3c6c0a298dbbb8641e466891561.tar.xz seaweedfs-f8446b42abd7f3c6c0a298dbbb8641e466891561.zip | |
this can compile now!!!
Diffstat (limited to 'weed/topology/node.go')
| -rw-r--r-- | weed/topology/node.go | 182 |
1 files changed, 44 insertions, 138 deletions
diff --git a/weed/topology/node.go b/weed/topology/node.go index b5c2680dd..5275af64a 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -2,40 +2,25 @@ package topology import ( "errors" - "github.com/chrislusf/seaweedfs/weed/storage" - "math/rand" - "strings" - "sync" - "sync/atomic" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "math/rand" + "strings" + "sync" ) type NodeId string type Node interface { Id() NodeId String() string - FreeSpace() int64 AvailableSpaceFor(option *VolumeGrowOption) int64 ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error) - UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) - UpAdjustMaxSsdVolumeCountDelta(maxSsdVolumeCountDelta int64) - UpAdjustVolumeCountDelta(volumeCountDelta int64) - UpAdjustSsdVolumeCountDelta(ssdVolumeCountDelta int64) - UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) - UpAdjustEcShardCountDelta(ecShardCountDelta int64) - UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) + UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages) UpAdjustMaxVolumeId(vid needle.VolumeId) + GetDiskUsages() *DiskUsages - GetVolumeCount() int64 - GetSsdVolumeCount() int64 - GetEcShardCount() int64 - GetActiveVolumeCount() int64 - GetRemoteVolumeCount() int64 - GetMaxVolumeCount() int64 - GetMaxSsdVolumeCount() int64 GetMaxVolumeId() needle.VolumeId SetParent(Node) LinkChildNode(node Node) @@ -51,24 +36,22 @@ type Node interface { GetValue() interface{} //get reference to the topology,dc,rack,datanode } type NodeImpl struct { - volumeCount int64 - remoteVolumeCount int64 - ssdVolumeCount int64 - activeVolumeCount int64 - ecShardCount int64 - maxVolumeCount int64 - maxSsdVolumeCount int64 - id NodeId - parent Node - sync.RWMutex // lock children - children map[NodeId]Node - maxVolumeId needle.VolumeId + diskUsages *DiskUsages + id NodeId + parent Node + sync.RWMutex // lock children + children map[NodeId]Node + maxVolumeId needle.VolumeId //for rack, data center, topology nodeType string value interface{} } +func (n *NodeImpl) GetDiskUsages() *DiskUsages { + return n.diskUsages +} + // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, option *VolumeGrowOption, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) { var totalWeights int64 @@ -150,20 +133,14 @@ func (n *NodeImpl) String() string { func (n *NodeImpl) Id() NodeId { return n.id } -func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 { - freeVolumeSlotCount := n.maxVolumeCount + n.remoteVolumeCount - n.volumeCount - if option.DiskType == storage.SsdType { - freeVolumeSlotCount = n.maxSsdVolumeCount - n.ssdVolumeCount - } - if n.ecShardCount > 0 { - freeVolumeSlotCount = freeVolumeSlotCount - n.ecShardCount/erasure_coding.DataShardsCount - 1 - } - return freeVolumeSlotCount +func (n *NodeImpl) getOrCreateDisk(diskType types.DiskType) *DiskUsageCounts { + return n.diskUsages.getOrCreateDisk(diskType) } -func (n *NodeImpl) FreeSpace() int64 { - freeVolumeSlotCount := n.maxVolumeCount + n.maxSsdVolumeCount + n.remoteVolumeCount - n.volumeCount - n.ssdVolumeCount - if n.ecShardCount > 0 { - freeVolumeSlotCount = freeVolumeSlotCount - n.ecShardCount/erasure_coding.DataShardsCount - 1 +func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 { + t := n.getOrCreateDisk(option.DiskType) + freeVolumeSlotCount := t.maxVolumeCount + t.remoteVolumeCount - t.volumeCount + if t.ecShardCount > 0 { + freeVolumeSlotCount = freeVolumeSlotCount - t.ecShardCount/erasure_coding.DataShardsCount - 1 } return freeVolumeSlotCount } @@ -209,67 +186,29 @@ func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assigned return nil, errors.New("No free volume slot found!") } -func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //can be negative - if maxVolumeCountDelta == 0 { - return - } - atomic.AddInt64(&n.maxVolumeCount, maxVolumeCountDelta) - if n.parent != nil { - n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) - } -} -func (n *NodeImpl) UpAdjustMaxSsdVolumeCountDelta(maxSsdVolumeCountDelta int64) { //can be negative - if maxSsdVolumeCountDelta == 0 { - return - } - atomic.AddInt64(&n.maxSsdVolumeCount, maxSsdVolumeCountDelta) - if n.parent != nil { - n.parent.UpAdjustMaxSsdVolumeCountDelta(maxSsdVolumeCountDelta) - } -} -func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative - if volumeCountDelta == 0 { - return - } - atomic.AddInt64(&n.volumeCount, volumeCountDelta) - if n.parent != nil { - n.parent.UpAdjustVolumeCountDelta(volumeCountDelta) - } -} -func (n *NodeImpl) UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) { //can be negative - if remoteVolumeCountDelta == 0 { - return - } - atomic.AddInt64(&n.remoteVolumeCount, remoteVolumeCountDelta) - if n.parent != nil { - n.parent.UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta) - } -} -func (n *NodeImpl) UpAdjustSsdVolumeCountDelta(ssdVolumeCountDelta int64) { //can be negative - if ssdVolumeCountDelta == 0 { - return - } - atomic.AddInt64(&n.ssdVolumeCount, ssdVolumeCountDelta) - if n.parent != nil { - n.parent.UpAdjustSsdVolumeCountDelta(ssdVolumeCountDelta) - } -} -func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative - if ecShardCountDelta == 0 { - return - } - atomic.AddInt64(&n.ecShardCount, ecShardCountDelta) - if n.parent != nil { - n.parent.UpAdjustEcShardCountDelta(ecShardCountDelta) +func (n *NodeImpl) AdjustMaxVolumeCounts(maxVolumeCounts map[string]uint32) { + deltaDiskUsages := newDiskUsages() + for diskType, maxVolumeCount := range maxVolumeCounts { + if maxVolumeCount == 0 { + // the volume server may have set the max to zero + continue + } + dt := types.DiskType(diskType) + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(dt) + currentDiskUsage := n.diskUsages.getOrCreateDisk(dt) + deltaDiskUsage.maxVolumeCount = int64(maxVolumeCount) - currentDiskUsage.maxVolumeCount + deltaDiskUsages.getOrCreateDisk(dt).maxVolumeCount = int64(maxVolumeCount) } + n.UpAdjustDiskUsageDelta(deltaDiskUsages) } -func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative - if activeVolumeCountDelta == 0 { - return + +func (n *NodeImpl) UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages) { //can be negative + for diskType, diskUsage := range deltaDiskUsages.usages { + existingDisk := n.getOrCreateDisk(diskType) + existingDisk.addDiskUsageCounts(diskUsage) } - atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta) if n.parent != nil { - n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta) + n.parent.UpAdjustDiskUsageDelta(deltaDiskUsages) } } func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative @@ -283,41 +222,14 @@ func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId { return n.maxVolumeId } -func (n *NodeImpl) GetVolumeCount() int64 { - return n.volumeCount -} -func (n *NodeImpl) GetSsdVolumeCount() int64 { - return n.ssdVolumeCount -} -func (n *NodeImpl) GetEcShardCount() int64 { - return n.ecShardCount -} -func (n *NodeImpl) GetRemoteVolumeCount() int64 { - return n.remoteVolumeCount -} -func (n *NodeImpl) GetActiveVolumeCount() int64 { - return n.activeVolumeCount -} -func (n *NodeImpl) GetMaxVolumeCount() int64 { - return n.maxVolumeCount -} -func (n *NodeImpl) GetMaxSsdVolumeCount() int64 { - return n.maxSsdVolumeCount -} func (n *NodeImpl) LinkChildNode(node Node) { n.Lock() defer n.Unlock() if n.children[node.Id()] == nil { n.children[node.Id()] = node - n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount()) - n.UpAdjustMaxSsdVolumeCountDelta(node.GetMaxSsdVolumeCount()) + n.UpAdjustDiskUsageDelta(node.GetDiskUsages()) n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) - n.UpAdjustVolumeCountDelta(node.GetVolumeCount()) - n.UpAdjustSsdVolumeCountDelta(node.GetSsdVolumeCount()) - n.UpAdjustRemoteVolumeCountDelta(node.GetRemoteVolumeCount()) - n.UpAdjustEcShardCountDelta(node.GetEcShardCount()) - n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount()) node.SetParent(n) glog.V(0).Infoln(n, "adds child", node.Id()) } @@ -330,13 +242,7 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { if node != nil { node.SetParent(nil) delete(n.children, node.Id()) - n.UpAdjustVolumeCountDelta(-node.GetVolumeCount()) - n.UpAdjustSsdVolumeCountDelta(-node.GetSsdVolumeCount()) - n.UpAdjustRemoteVolumeCountDelta(-node.GetRemoteVolumeCount()) - n.UpAdjustEcShardCountDelta(-node.GetEcShardCount()) - n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount()) - n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount()) - n.UpAdjustMaxSsdVolumeCountDelta(-node.GetMaxSsdVolumeCount()) + n.UpAdjustDiskUsageDelta(node.GetDiskUsages().negative()) glog.V(0).Infoln(n, "removes", node.Id()) } } |
