aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/server/master_grpc_server_volume.go27
-rw-r--r--weed/topology/topology.go13
-rw-r--r--weed/topology/volume_layout.go31
3 files changed, 42 insertions, 29 deletions
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index cc9bc3c51..24e9c058c 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "math"
"math/rand/v2"
"strings"
"sync"
@@ -53,7 +54,7 @@ func (ms *MasterServer) ProcessGrowRequest() {
if !ms.Topo.IsLeader() {
continue
}
- dcs := ms.Topo.ListDataCenters()
+ dcs := ms.Topo.ListDCAndRacks()
var err error
for _, vlc := range ms.Topo.ListVolumeLayoutCollections() {
vl := vlc.VolumeLayout
@@ -74,22 +75,28 @@ func (ms *MasterServer) ProcessGrowRequest() {
case lastGrowCount > 0 && writable < int(lastGrowCount*2) && float64(crowded+volumeGrowStepCount) > float64(writable)*topology.VolumeGrowStrategy.Threshold:
vgr.WritableVolumeCount = volumeGrowStepCount
_, err = ms.VolumeGrow(ctx, vgr)
- default:
- for _, dc := range dcs {
- if vl.ShouldGrowVolumesByDataNode("DataCenter", dc) {
- vgr.DataCenter = dc
+ }
+ if err != nil {
+ glog.V(0).Infof("volume grow request failed: %+v", err)
+ }
+ writableVolumes := vl.CloneWritableVolumes()
+ for dcId, racks := range dcs {
+ for _, rackId := range racks {
+ if vl.ShouldGrowVolumesByDcAndRack(&writableVolumes, dcId, rackId) {
+ vgr.DataCenter = string(dcId)
+ vgr.Rack = string(rackId)
if lastGrowCount > 0 {
- vgr.WritableVolumeCount = uint32(int(lastGrowCount) / len(dcs))
+ vgr.WritableVolumeCount = uint32(math.Ceil(float64(lastGrowCount) / float64(len(dcs)*len(racks))))
} else {
vgr.WritableVolumeCount = volumeGrowStepCount
}
- _, err = ms.VolumeGrow(ctx, vgr)
+
+ if _, err = ms.VolumeGrow(ctx, vgr); err != nil {
+ glog.V(0).Infof("volume grow request for dc:%s rack:%s failed: %+v", dcId, rackId, err)
+ }
}
}
}
- if err != nil {
- glog.V(0).Infof("volume grow request failed: %+v", err)
- }
}
}
}()
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index e436b453a..be50eecdf 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -369,6 +369,19 @@ func (t *Topology) ListDataCenters() (dcs []string) {
return dcs
}
+func (t *Topology) ListDCAndRacks() (dcs map[NodeId][]NodeId) {
+ t.RLock()
+ defer t.RUnlock()
+ dcs = make(map[NodeId][]NodeId)
+ for _, dcNode := range t.children {
+ dcNodeId := dcNode.(*DataCenter).Id()
+ for _, rackNode := range dcNode.Children() {
+ dcs[dcNodeId] = append(dcs[dcNodeId], rackNode.(*Rack).Id())
+ }
+ }
+ return dcs
+}
+
func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformationMessage, dn *DataNode) (newVolumes, deletedVolumes []storage.VolumeInfo) {
// convert into in memory struct storage.VolumeInfo
var volumeInfos []storage.VolumeInfo
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index 3a360ff99..94493a177 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -365,25 +365,10 @@ func (vl *VolumeLayout) ShouldGrowVolumes() bool {
return writable <= crowded
}
-func (vl *VolumeLayout) ShouldGrowVolumesByDataNode(nodeType string, dataNode string) bool {
- vl.accessLock.RLock()
- writables := make([]needle.VolumeId, len(vl.writables))
- copy(writables, vl.writables)
- vl.accessLock.RUnlock()
-
- dataNodeId := NodeId(dataNode)
- for _, v := range writables {
- for _, dn := range vl.vid2location[v].list {
- dataNodeFound := false
- switch nodeType {
- case "DataCenter":
- dataNodeFound = dn.GetDataCenter().Id() == dataNodeId
- case "Rack":
- dataNodeFound = dn.GetRack().Id() == dataNodeId
- case "DataNode":
- dataNodeFound = dn.Id() == dataNodeId
- }
- if dataNodeFound {
+func (vl *VolumeLayout) ShouldGrowVolumesByDcAndRack(writables *[]needle.VolumeId, dcId NodeId, rackId NodeId) bool {
+ for _, v := range *writables {
+ for _, dn := range vl.Lookup(v) {
+ if dn.GetDataCenter().Id() == dcId && dn.GetRack().Id() == rackId {
if info, err := dn.GetVolumesById(v); err == nil && !vl.isCrowdedVolume(&info) {
return false
}
@@ -399,6 +384,14 @@ func (vl *VolumeLayout) GetWritableVolumeCount() (active, crowded int) {
return len(vl.writables), len(vl.crowded)
}
+func (vl *VolumeLayout) CloneWritableVolumes() (writables []needle.VolumeId) {
+ vl.accessLock.RLock()
+ writables = make([]needle.VolumeId, len(vl.writables))
+ copy(writables, vl.writables)
+ vl.accessLock.RUnlock()
+ return writables
+}
+
func (vl *VolumeLayout) removeFromWritable(vid needle.VolumeId) bool {
toDeleteIndex := -1
for k, id := range vl.writables {