aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2024-09-04 20:16:44 +0500
committerGitHub <noreply@github.com>2024-09-04 08:16:44 -0700
commit67a252ee8a0cbcf0f33cb7c94de21d4791ef1f39 (patch)
tree36c494fa2bba916714cd0ac53e9070a4a1295d6b
parenteb02946c977be57a0325d9ed86847699e99661c1 (diff)
downloadseaweedfs-67a252ee8a0cbcf0f33cb7c94de21d4791ef1f39.tar.xz
seaweedfs-67a252ee8a0cbcf0f33cb7c94de21d4791ef1f39.zip
[master] refactor func ShouldGrowVolumes (#5884)
-rw-r--r--weed/server/master_grpc_server_volume.go47
-rw-r--r--weed/stats/metrics.go19
-rw-r--r--weed/topology/topology.go11
-rw-r--r--weed/topology/topology_info.go11
-rw-r--r--weed/topology/volume_growth.go1
-rw-r--r--weed/topology/volume_layout.go94
6 files changed, 121 insertions, 62 deletions
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 8fc06bdb6..eb0d43705 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -27,7 +27,7 @@ func (ms *MasterServer) DoAutomaticVolumeGrow(req *topology.VolumeGrowRequest) {
newVidLocations, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count)
glog.V(1).Infoln("finished automatic volume grow, cost ", time.Now().Sub(start))
if err != nil {
- glog.Warningf("automatic volume grow %s: %+v", req.Option, err)
+ glog.V(1).Infof("automatic volume grow failed: %+v", err)
return
}
for _, newVidLocation := range newVidLocations {
@@ -38,19 +38,37 @@ func (ms *MasterServer) DoAutomaticVolumeGrow(req *topology.VolumeGrowRequest) {
func (ms *MasterServer) ProcessGrowRequest() {
go func() {
for {
- time.Sleep(14*time.Minute + time.Duration(120*rand.Float32())*time.Second)
if !ms.Topo.IsLeader() {
continue
}
- for _, vl := range ms.Topo.ListVolumeLayouts() {
- if !vl.HasGrowRequest() && vl.ShouldGrowVolumes(&topology.VolumeGrowOption{}) {
+ dcs := ms.Topo.ListDataCenters()
+ for _, vlc := range ms.Topo.ListVolumeLayoutCollections() {
+ vl := vlc.VolumeLayout
+ if vl.HasGrowRequest() {
+ continue
+ }
+ if vl.ShouldGrowVolumes(vlc.Collection) {
vl.AddGrowRequest()
ms.volumeGrowthRequestChan <- &topology.VolumeGrowRequest{
- Option: vl.ToGrowOption(),
+ Option: vlc.ToGrowOption(),
Count: vl.GetLastGrowCount(),
}
+ } else {
+ for _, dc := range dcs {
+ if vl.ShouldGrowVolumesByDataNode("DataCenter", dc) {
+ vl.AddGrowRequest()
+ volumeGrowOption := vlc.ToGrowOption()
+ volumeGrowOption.DataCenter = dc
+ ms.volumeGrowthRequestChan <- &topology.VolumeGrowRequest{
+ Option: volumeGrowOption,
+ Count: vl.GetLastGrowCount(),
+ Force: true,
+ }
+ }
+ }
}
}
+ time.Sleep(14*time.Minute + time.Duration(120*rand.Float32())*time.Second)
}
}()
go func() {
@@ -81,19 +99,20 @@ func (ms *MasterServer) ProcessGrowRequest() {
})
// not atomic but it's okay
- if !found && vl.ShouldGrowVolumes(option) {
- filter.Store(req, nil)
- // we have lock called inside vg
- go func(req *topology.VolumeGrowRequest, vl *topology.VolumeLayout) {
- ms.DoAutomaticVolumeGrow(req)
- vl.DoneGrowRequest()
- filter.Delete(req)
- }(req, vl)
- } else {
+ if found || (!req.Force && !vl.ShouldGrowVolumes(req.Option.Collection)) {
glog.V(4).Infoln("discard volume grow request")
time.Sleep(time.Millisecond * 211)
vl.DoneGrowRequest()
+ continue
}
+
+ filter.Store(req, nil)
+ // we have lock called inside vg
+ go func(req *topology.VolumeGrowRequest, vl *topology.VolumeLayout) {
+ ms.DoAutomaticVolumeGrow(req)
+ vl.DoneGrowRequest()
+ filter.Delete(req)
+ }(req, vl)
}
}()
}
diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go
index 134485946..956bf4009 100644
--- a/weed/stats/metrics.go
+++ b/weed/stats/metrics.go
@@ -70,13 +70,21 @@ var (
Help: "replica placement mismatch",
}, []string{"collection", "id"})
- MasterVolumeLayout = prometheus.NewGaugeVec(
+ MasterVolumeLayoutWritable = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: "master",
- Name: "volume_layout_total",
- Help: "Number of volumes in volume layouts",
- }, []string{"collection", "dataCenter", "type"})
+ Name: "volume_layout_writable",
+ Help: "Number of writable volumes in volume layouts",
+ }, []string{"collection", "disk", "rp", "ttl"})
+
+ MasterVolumeLayoutCrowded = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: Namespace,
+ Subsystem: "master",
+ Name: "volume_layout_crowded",
+ Help: "Number of crowded volumes in volume layouts",
+ }, []string{"collection", "disk", "rp", "ttl"})
MasterPickForWriteErrorCounter = prometheus.NewCounter(
prometheus.CounterOpts{
@@ -281,7 +289,8 @@ func init() {
Gather.MustRegister(MasterReceivedHeartbeatCounter)
Gather.MustRegister(MasterLeaderChangeCounter)
Gather.MustRegister(MasterReplicaPlacementMismatch)
- Gather.MustRegister(MasterVolumeLayout)
+ Gather.MustRegister(MasterVolumeLayoutWritable)
+ Gather.MustRegister(MasterVolumeLayoutCrowded)
Gather.MustRegister(FilerRequestCounter)
Gather.MustRegister(FilerHandlerCounter)
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 6bfd912cb..ba3be97c4 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -329,7 +329,7 @@ func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
}
func (t *Topology) DataCenterExists(dcName string) bool {
- return dcName == "" || t.GetOrCreateDataCenter(dcName) != nil
+ return dcName == "" || t.GetDataCenter(dcName) != nil
}
func (t *Topology) GetDataCenter(dcName string) (dc *DataCenter) {
@@ -358,6 +358,15 @@ func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
return dc
}
+func (t *Topology) ListDataCenters() (dcs []string) {
+ t.RLock()
+ defer t.RUnlock()
+ for _, c := range t.children {
+ dcs = append(dcs, string(c.(*DataCenter).Id()))
+ }
+ return dcs
+}
+
func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformationMessage, dn *DataNode) (newVolumes, deletedVolumes []storage.VolumeInfo) {
// convert into in memory struct storage.VolumeInfo
var volumeInfos []storage.VolumeInfo
diff --git a/weed/topology/topology_info.go b/weed/topology/topology_info.go
index 5fa439a0b..33a047b1e 100644
--- a/weed/topology/topology_info.go
+++ b/weed/topology/topology_info.go
@@ -13,6 +13,11 @@ type TopologyInfo struct {
Layouts []VolumeLayoutInfo `json:"Layouts"`
}
+type VolumeLayoutCollection struct {
+ Collection string
+ VolumeLayout *VolumeLayout
+}
+
func (t *Topology) ToInfo() (info TopologyInfo) {
info.Max = t.diskUsages.GetMaxVolumeCount()
info.Free = t.diskUsages.FreeSpace()
@@ -42,10 +47,12 @@ func (t *Topology) ToInfo() (info TopologyInfo) {
return
}
-func (t *Topology) ListVolumeLayouts() (volumeLayouts []*VolumeLayout) {
+func (t *Topology) ListVolumeLayoutCollections() (volumeLayouts []*VolumeLayoutCollection) {
for _, col := range t.collectionMap.Items() {
for _, volumeLayout := range col.(*Collection).storageType2VolumeLayout.Items() {
- volumeLayouts = append(volumeLayouts, volumeLayout.(*VolumeLayout))
+ volumeLayouts = append(volumeLayouts,
+ &VolumeLayoutCollection{col.(*Collection).Name, volumeLayout.(*VolumeLayout)},
+ )
}
}
return volumeLayouts
diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go
index 7f8684753..70f0d9cd4 100644
--- a/weed/topology/volume_growth.go
+++ b/weed/topology/volume_growth.go
@@ -28,6 +28,7 @@ This package is created to resolve these replica placement issues:
type VolumeGrowRequest struct {
Option *VolumeGrowOption
Count uint32
+ Force bool
}
type volumeGrowthStrategy struct {
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index 6ac4d63db..baa9b91d4 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -3,7 +3,7 @@ package topology
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/stats"
- "math/rand/v2"
+ "math/rand"
"sync"
"sync/atomic"
"time"
@@ -218,17 +218,21 @@ func (vl *VolumeLayout) EnsureCorrectWritables(v *storage.VolumeInfo) {
}
func (vl *VolumeLayout) ensureCorrectWritables(vid needle.VolumeId) {
- if vl.enoughCopies(vid) && vl.isAllWritable(vid) {
- if !vl.oversizedVolumes.IsTrue(vid) {
- vl.setVolumeWritable(vid)
- }
+ isEnoughCopies := vl.enoughCopies(vid)
+ isAllWritable := vl.isAllWritable(vid)
+ isOversizedVolume := vl.oversizedVolumes.IsTrue(vid)
+ if isEnoughCopies && isAllWritable && !isOversizedVolume {
+ vl.setVolumeWritable(vid)
} else {
- if !vl.enoughCopies(vid) {
+ if !isEnoughCopies {
glog.V(0).Infof("volume %d does not have enough copies", vid)
}
- if !vl.isAllWritable(vid) {
+ if !isAllWritable {
glog.V(0).Infof("volume %d are not all writable", vid)
}
+ if isOversizedVolume {
+ glog.V(1).Infof("volume %d are oversized", vid)
+ }
glog.V(0).Infof("volume %d remove from writable", vid)
vl.removeFromWritable(vid)
}
@@ -254,6 +258,10 @@ func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool {
return uint64(v.Size) >= vl.volumeSizeLimit
}
+func (vl *VolumeLayout) isCrowdedVolume(v *storage.VolumeInfo) bool {
+ return float64(v.Size) > float64(vl.volumeSizeLimit)*VolumeGrowStrategy.Threshold
+}
+
func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
return !vl.isOversized(v) &&
v.Version == needle.CurrentVersion &&
@@ -296,7 +304,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (vi
return 0, 0, nil, true, fmt.Errorf("%s in volume layout", noWritableVolumes)
}
if option.DataCenter == "" && option.Rack == "" && option.DataNode == "" {
- vid := vl.writables[rand.IntN(lenWriters)]
+ vid := vl.writables[rand.Intn(lenWriters)]
locationList = vl.vid2location[vid]
if locationList == nil || len(locationList.list) == 0 {
return 0, 0, nil, false, fmt.Errorf("Strangely vid %s is on no machine!", vid.String())
@@ -351,40 +359,45 @@ func (vl *VolumeLayout) GetLastGrowCount() uint32 {
return vl.lastGrowCount.Load()
}
-func (vl *VolumeLayout) ShouldGrowVolumes(option *VolumeGrowOption) bool {
- total, active, crowded := vl.GetActiveVolumeCount(option)
- stats.MasterVolumeLayout.WithLabelValues(option.Collection, option.DataCenter, "total").Set(float64(total))
- stats.MasterVolumeLayout.WithLabelValues(option.Collection, option.DataCenter, "active").Set(float64(active))
- stats.MasterVolumeLayout.WithLabelValues(option.Collection, option.DataCenter, "crowded").Set(float64(crowded))
- //glog.V(0).Infof("active volume: %d, high usage volume: %d\n", active, high)
- return active <= crowded
+func (vl *VolumeLayout) ShouldGrowVolumes(collection string) bool {
+ writable, crowded := vl.GetWritableVolumeCount()
+ stats.MasterVolumeLayoutWritable.WithLabelValues(collection, vl.diskType.String(), vl.rp.String(), vl.ttl.String()).Set(float64(writable))
+ stats.MasterVolumeLayoutCrowded.WithLabelValues(collection, vl.diskType.String(), vl.rp.String(), vl.ttl.String()).Set(float64(crowded))
+ return writable <= crowded
}
-func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) (total, active, crowded int) {
+func (vl *VolumeLayout) ShouldGrowVolumesByDataNode(nodeType string, dataNode string) bool {
vl.accessLock.RLock()
- defer vl.accessLock.RUnlock()
- if option.DataCenter == "" {
- return len(vl.writables), len(vl.writables), len(vl.crowded)
- }
- total = len(vl.writables)
- for _, v := range vl.writables {
+ writables := make([]needle.VolumeId, len(vl.writables))
+ copy(writables, vl.writables)
+ vl.accessLock.RUnlock()
+
+ dataNodeId := NodeId(dataNode)
+ for _, v := range writables {
for _, dn := range vl.vid2location[v].list {
- if dn.GetDataCenter().Id() == NodeId(option.DataCenter) {
- if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) {
- continue
- }
- if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) {
- continue
- }
- active++
- info, _ := dn.GetVolumesById(v)
- if float64(info.Size) > float64(vl.volumeSizeLimit)*VolumeGrowStrategy.Threshold {
- crowded++
+ dataNodeFound := false
+ switch nodeType {
+ case "DataCenter":
+ dataNodeFound = dn.GetDataCenter().Id() == dataNodeId
+ case "Rack":
+ dataNodeFound = dn.GetRack().Id() == dataNodeId
+ case "DataNode":
+ dataNodeFound = dn.Id() == dataNodeId
+ }
+ if dataNodeFound {
+ if info, err := dn.GetVolumesById(v); err == nil && !vl.isCrowdedVolume(&info) {
+ return false
}
}
}
}
- return
+ return true
+}
+
+func (vl *VolumeLayout) GetWritableVolumeCount() (active, crowded int) {
+ vl.accessLock.RLock()
+ defer vl.accessLock.RUnlock()
+ return len(vl.writables), len(vl.crowded)
}
func (vl *VolumeLayout) removeFromWritable(vid needle.VolumeId) bool {
@@ -531,12 +544,13 @@ func (vl *VolumeLayout) ToInfo() (info VolumeLayoutInfo) {
return
}
-func (vl *VolumeLayout) ToGrowOption() (option *VolumeGrowOption) {
- option = &VolumeGrowOption{}
- option.ReplicaPlacement = vl.rp
- option.Ttl = vl.ttl
- option.DiskType = vl.diskType
- return
+func (vlc *VolumeLayoutCollection) ToGrowOption() (option *VolumeGrowOption) {
+ return &VolumeGrowOption{
+ Collection: vlc.Collection,
+ ReplicaPlacement: vlc.VolumeLayout.rp,
+ Ttl: vlc.VolumeLayout.ttl,
+ DiskType: vlc.VolumeLayout.diskType,
+ }
}
func (vl *VolumeLayout) Stats() *VolumeLayoutStats {