diff options
| author | shibinbin <shibinbin@megvii.com> | 2020-06-04 17:24:18 +0800 |
|---|---|---|
| committer | shibinbin <shibinbin@megvii.com> | 2020-06-04 17:24:18 +0800 |
| commit | 40334bc28d3fa694ce59b4e65077efb845264d20 (patch) | |
| tree | a085e2e33851c4d916bef2952abc7cfbfe95ee88 /weed/topology/node.go | |
| parent | d892cad15d748327c2b7c649f6398ff35d8dce0b (diff) | |
| parent | fbed2e9026b71c810dd86bd826c9e068e93d3c48 (diff) | |
| download | seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.tar.xz seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.zip | |
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'weed/topology/node.go')
| -rw-r--r-- | weed/topology/node.go | 93 |
1 files changed, 58 insertions, 35 deletions
diff --git a/weed/topology/node.go b/weed/topology/node.go index 572a89d4d..114417edf 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -62,56 +62,64 @@ type NodeImpl struct { } // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot -func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) { - candidates := make([]Node, 0, len(n.children)) +func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) { + var totalWeights int64 var errs []string n.RLock() + candidates := make([]Node, 0, len(n.children)) + candidatesWeights := make([]int64, 0, len(n.children)) + //pick nodes which has enough free volumes as candidates, and use free volumes number as node weight. for _, node := range n.children { - if err := filterFirstNodeFn(node); err == nil { - candidates = append(candidates, node) - } else { - errs = append(errs, string(node.Id())+":"+err.Error()) + if node.FreeSpace() <= 0 { + continue } + totalWeights += node.FreeSpace() + candidates = append(candidates, node) + candidatesWeights = append(candidatesWeights, node.FreeSpace()) } n.RUnlock() - if len(candidates) == 0 { - return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n")) + if len(candidates) < numberOfNodes { + glog.V(0).Infoln(n.Id(), "failed to pick", numberOfNodes, "from ", len(candidates), "node candidates") + return nil, nil, errors.New("No enough data node found!") } - firstNode = candidates[rand.Intn(len(candidates))] - glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id()) - restNodes = make([]Node, numberOfNodes-1) - candidates = candidates[:0] - n.RLock() - for _, node := range n.children { - if node.Id() == firstNode.Id() { - continue - } - if node.FreeSpace() <= 0 { - continue + //pick nodes randomly by weights, the node picked earlier has higher final weights + sortedCandidates := make([]Node, 0, len(candidates)) + for i := 0; i < len(candidates); i++ { + weightsInterval := rand.Int63n(totalWeights) + lastWeights := int64(0) + for k, weights := range candidatesWeights { + if (weightsInterval >= lastWeights) && (weightsInterval < lastWeights+weights) { + sortedCandidates = append(sortedCandidates, candidates[k]) + candidatesWeights[k] = 0 + totalWeights -= weights + break + } + lastWeights += weights } - glog.V(2).Infoln("select rest node candidate:", node.Id()) - candidates = append(candidates, node) } - n.RUnlock() - glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates") - ret := len(restNodes) == 0 - for k, node := range candidates { - if k < len(restNodes) { - restNodes[k] = node - if k == len(restNodes)-1 { - ret = true + + restNodes = make([]Node, 0, numberOfNodes-1) + ret := false + n.RLock() + for k, node := range sortedCandidates { + if err := filterFirstNodeFn(node); err == nil { + firstNode = node + if k >= numberOfNodes-1 { + restNodes = sortedCandidates[:numberOfNodes-1] + } else { + restNodes = append(restNodes, sortedCandidates[:k]...) + restNodes = append(restNodes, sortedCandidates[k+1:numberOfNodes]...) } + ret = true + break } else { - r := rand.Intn(k + 1) - if r < len(restNodes) { - restNodes[r] = node - } + errs = append(errs, string(node.Id())+":"+err.Error()) } } + n.RUnlock() if !ret { - glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates") - err = errors.New("No enough data node found!") + return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n")) } return } @@ -184,30 +192,45 @@ func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) } func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //can be negative + if maxVolumeCountDelta == 0 { + return + } atomic.AddInt64(&n.maxVolumeCount, maxVolumeCountDelta) if n.parent != nil { n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) } } func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative + if volumeCountDelta == 0 { + return + } atomic.AddInt64(&n.volumeCount, volumeCountDelta) if n.parent != nil { n.parent.UpAdjustVolumeCountDelta(volumeCountDelta) } } func (n *NodeImpl) UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) { //can be negative + if remoteVolumeCountDelta == 0 { + return + } atomic.AddInt64(&n.remoteVolumeCount, remoteVolumeCountDelta) if n.parent != nil { n.parent.UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta) } } func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative + if ecShardCountDelta == 0 { + return + } atomic.AddInt64(&n.ecShardCount, ecShardCountDelta) if n.parent != nil { n.parent.UpAdjustEcShardCountDelta(ecShardCountDelta) } } func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative + if activeVolumeCountDelta == 0 { + return + } atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta) if n.parent != nil { n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta) |
