aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-10-10 10:00:30 -0700
committerchrislu <chris.lu@gmail.com>2024-10-10 10:00:30 -0700
commit35fd1e1c9af62df7ca50324a0db2983ac4af4176 (patch)
treedd0be4f5fb6799ec522094a82e37daf91525d015
parentb28b1a34025a2f2ed80883e245250d00783bfea7 (diff)
downloadseaweedfs-35fd1e1c9af62df7ca50324a0db2983ac4af4176.tar.xz
seaweedfs-35fd1e1c9af62df7ca50324a0db2983ac4af4176.zip
optimize memory usage for large number of volumes
1. unwrap the map to avoid extra map object creation 2. fix ec shard counting in UpdateEcShards
-rw-r--r--weed/topology/data_node.go17
-rw-r--r--weed/topology/data_node_ec.go18
-rw-r--r--weed/topology/disk.go7
-rw-r--r--weed/topology/disk_ec.go19
-rw-r--r--weed/topology/node.go20
-rw-r--r--weed/topology/topology_event_handling.go11
-rw-r--r--weed/topology/volume_growth_test.go7
7 files changed, 50 insertions, 49 deletions
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
index de428d38f..3103dc207 100644
--- a/weed/topology/data_node.go
+++ b/weed/topology/data_node.go
@@ -83,8 +83,7 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume
disk.DeleteVolumeById(vid)
deletedVolumes = append(deletedVolumes, v)
- deltaDiskUsages := newDiskUsages()
- deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
+ deltaDiskUsage := &DiskUsageCounts{}
deltaDiskUsage.volumeCount = -1
if v.IsRemote() {
deltaDiskUsage.remoteVolumeCount = -1
@@ -92,7 +91,7 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume
if !v.ReadOnly {
deltaDiskUsage.activeVolumeCount = -1
}
- disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
+ disk.UpAdjustDiskUsageDelta(types.ToDiskType(v.DiskType), deltaDiskUsage)
}
}
for _, v := range actualVolumes {
@@ -120,8 +119,7 @@ func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.Volu
}
disk.DeleteVolumeById(v.Id)
- deltaDiskUsages := newDiskUsages()
- deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
+ deltaDiskUsage := &DiskUsageCounts{}
deltaDiskUsage.volumeCount = -1
if v.IsRemote() {
deltaDiskUsage.remoteVolumeCount = -1
@@ -129,7 +127,7 @@ func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.Volu
if !v.ReadOnly {
deltaDiskUsage.activeVolumeCount = -1
}
- disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
+ disk.UpAdjustDiskUsageDelta(types.ToDiskType(v.DiskType), deltaDiskUsage)
}
for _, v := range newVolumes {
dn.doAddOrUpdateVolume(v)
@@ -143,7 +141,6 @@ func (dn *DataNode) AdjustMaxVolumeCounts(maxVolumeCounts map[string]uint32) {
// the volume server may have set the max to zero
continue
}
- deltaDiskUsages := newDiskUsages()
dt := types.ToDiskType(diskType)
currentDiskUsage := dn.diskUsages.getOrCreateDisk(dt)
currentDiskUsageMaxVolumeCount := atomic.LoadInt64(&currentDiskUsage.maxVolumeCount)
@@ -151,9 +148,9 @@ func (dn *DataNode) AdjustMaxVolumeCounts(maxVolumeCounts map[string]uint32) {
continue
}
disk := dn.getOrCreateDisk(dt.String())
- deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(dt)
- deltaDiskUsage.maxVolumeCount = int64(maxVolumeCount) - currentDiskUsageMaxVolumeCount
- disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
+ disk.UpAdjustDiskUsageDelta(dt, &DiskUsageCounts{
+ maxVolumeCount: int64(maxVolumeCount) - currentDiskUsageMaxVolumeCount,
+ })
}
}
diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go
index 100b44f59..839499982 100644
--- a/weed/topology/data_node_ec.go
+++ b/weed/topology/data_node_ec.go
@@ -26,12 +26,10 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
existingEcShards := dn.GetEcShards()
// find out the newShards and deletedShards
- var newShardCount, deletedShardCount int
for _, ecShards := range existingEcShards {
+ var newShardCount, deletedShardCount int
disk := dn.getOrCreateDisk(ecShards.DiskType)
- deltaDiskUsages := newDiskUsages()
- deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType))
vid := ecShards.VolumeId
if actualEcShards, ok := actualEcShardMap[vid]; !ok {
@@ -52,8 +50,11 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
}
}
- deltaDiskUsage.ecShardCount = int64(newShardCount - deletedShardCount)
- disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
+ if (newShardCount - deletedShardCount) != 0 {
+ disk.UpAdjustDiskUsageDelta(types.ToDiskType(ecShards.DiskType), &DiskUsageCounts{
+ ecShardCount: int64(newShardCount - deletedShardCount),
+ })
+ }
}
@@ -65,10 +66,9 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
newShards = append(newShards, ecShards)
disk := dn.getOrCreateDisk(ecShards.DiskType)
- deltaDiskUsages := newDiskUsages()
- deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType))
- deltaDiskUsage.ecShardCount = int64(ecShards.ShardIdCount())
- disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
+ disk.UpAdjustDiskUsageDelta(types.ToDiskType(ecShards.DiskType), &DiskUsageCounts{
+ ecShardCount: int64(ecShards.ShardIdCount()),
+ })
}
if len(newShards) > 0 || len(deletedShards) > 0 {
diff --git a/weed/topology/disk.go b/weed/topology/disk.go
index 4597bfc29..6d789e34b 100644
--- a/weed/topology/disk.go
+++ b/weed/topology/disk.go
@@ -152,8 +152,7 @@ func (d *Disk) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChanged bool) {
}
func (d *Disk) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChanged bool) {
- deltaDiskUsages := newDiskUsages()
- deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
+ deltaDiskUsage := &DiskUsageCounts{}
if oldV, ok := d.volumes[v.Id]; !ok {
d.volumes[v.Id] = v
deltaDiskUsage.volumeCount = 1
@@ -164,7 +163,7 @@ func (d *Disk) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChanged bool)
deltaDiskUsage.activeVolumeCount = 1
}
d.UpAdjustMaxVolumeId(v.Id)
- d.UpAdjustDiskUsageDelta(deltaDiskUsages)
+ d.UpAdjustDiskUsageDelta(types.ToDiskType(v.DiskType), deltaDiskUsage)
isNew = true
} else {
if oldV.IsRemote() != v.IsRemote() {
@@ -174,7 +173,7 @@ func (d *Disk) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChanged bool)
if oldV.IsRemote() {
deltaDiskUsage.remoteVolumeCount = -1
}
- d.UpAdjustDiskUsageDelta(deltaDiskUsages)
+ d.UpAdjustDiskUsageDelta(types.ToDiskType(v.DiskType), deltaDiskUsage)
}
isChanged = d.volumes[v.Id].ReadOnly != v.ReadOnly
d.volumes[v.Id] = v
diff --git a/weed/topology/disk_ec.go b/weed/topology/disk_ec.go
index 4f950025f..1fea29272 100644
--- a/weed/topology/disk_ec.go
+++ b/weed/topology/disk_ec.go
@@ -29,10 +29,12 @@ func (d *Disk) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) {
delta = existing.ShardBits.ShardIdCount() - oldCount
}
- deltaDiskUsages := newDiskUsages()
- deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id())))
- deltaDiskUsage.ecShardCount = int64(delta)
- d.UpAdjustDiskUsageDelta(deltaDiskUsages)
+ if delta == 0 {
+ return
+ }
+ d.UpAdjustDiskUsageDelta(types.ToDiskType(string(d.Id())), &DiskUsageCounts{
+ ecShardCount: int64(delta),
+ })
}
@@ -45,10 +47,11 @@ func (d *Disk) DeleteEcShard(s *erasure_coding.EcVolumeInfo) {
existing.ShardBits = existing.ShardBits.Minus(s.ShardBits)
delta := existing.ShardBits.ShardIdCount() - oldCount
- deltaDiskUsages := newDiskUsages()
- deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id())))
- deltaDiskUsage.ecShardCount = int64(delta)
- d.UpAdjustDiskUsageDelta(deltaDiskUsages)
+ if delta != 0 {
+ d.UpAdjustDiskUsageDelta(types.ToDiskType(string(d.Id())), &DiskUsageCounts{
+ ecShardCount: int64(delta),
+ })
+ }
if existing.ShardBits.ShardIdCount() == 0 {
delete(d.ecShards, s.VolumeId)
diff --git a/weed/topology/node.go b/weed/topology/node.go
index d33bbce2b..aa178b561 100644
--- a/weed/topology/node.go
+++ b/weed/topology/node.go
@@ -21,7 +21,7 @@ type Node interface {
String() string
AvailableSpaceFor(option *VolumeGrowOption) int64
ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error)
- UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages)
+ UpAdjustDiskUsageDelta(diskType types.DiskType, diskUsage *DiskUsageCounts)
UpAdjustMaxVolumeId(vid needle.VolumeId)
GetDiskUsages() *DiskUsages
@@ -214,13 +214,11 @@ func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assigned
return nil, errors.New("No free volume slot found!")
}
-func (n *NodeImpl) UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages) { //can be negative
- for diskType, diskUsage := range deltaDiskUsages.usages {
- existingDisk := n.getOrCreateDisk(diskType)
- existingDisk.addDiskUsageCounts(diskUsage)
- }
+func (n *NodeImpl) UpAdjustDiskUsageDelta(diskType types.DiskType, diskUsage *DiskUsageCounts) { //can be negative
+ existingDisk := n.getOrCreateDisk(diskType)
+ existingDisk.addDiskUsageCounts(diskUsage)
if n.parent != nil {
- n.parent.UpAdjustDiskUsageDelta(deltaDiskUsages)
+ n.parent.UpAdjustDiskUsageDelta(diskType, diskUsage)
}
}
func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative
@@ -244,7 +242,9 @@ func (n *NodeImpl) LinkChildNode(node Node) {
func (n *NodeImpl) doLinkChildNode(node Node) {
if n.children[node.Id()] == nil {
n.children[node.Id()] = node
- n.UpAdjustDiskUsageDelta(node.GetDiskUsages())
+ for dt, du := range node.GetDiskUsages().usages {
+ n.UpAdjustDiskUsageDelta(dt, du)
+ }
n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
node.SetParent(n)
glog.V(0).Infoln(n, "adds child", node.Id())
@@ -258,7 +258,9 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
if node != nil {
node.SetParent(nil)
delete(n.children, node.Id())
- n.UpAdjustDiskUsageDelta(node.GetDiskUsages().negative())
+ for dt, du := range node.GetDiskUsages().negative().usages {
+ n.UpAdjustDiskUsageDelta(dt, du)
+ }
glog.V(0).Infoln(n, "removes", node.Id())
}
}
diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go
index ff1c642a6..109f29ee0 100644
--- a/weed/topology/topology_event_handling.go
+++ b/weed/topology/topology_event_handling.go
@@ -65,10 +65,9 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
if !volumeInfo.ReadOnly {
disk := dn.getOrCreateDisk(volumeInfo.DiskType)
- deltaDiskUsages := newDiskUsages()
- deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(volumeInfo.DiskType))
- deltaDiskUsage.activeVolumeCount = -1
- disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
+ disk.UpAdjustDiskUsageDelta(types.ToDiskType(volumeInfo.DiskType), &DiskUsageCounts{
+ activeVolumeCount: -1,
+ })
}
}
@@ -96,7 +95,9 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
}
negativeUsages := dn.GetDiskUsages().negative()
- dn.UpAdjustDiskUsageDelta(negativeUsages)
+ for dt, du := range negativeUsages.usages {
+ dn.UpAdjustDiskUsageDelta(dt, du)
+ }
dn.DeltaUpdateVolumes([]storage.VolumeInfo{}, dn.GetVolumes())
dn.DeltaUpdateEcShards([]*erasure_coding.EcVolumeInfo{}, dn.GetEcShards())
if dn.Parent() != nil {
diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go
index 1624ec32a..49dd0957c 100644
--- a/weed/topology/volume_growth_test.go
+++ b/weed/topology/volume_growth_test.go
@@ -122,10 +122,9 @@ func setup(topologyLayout string) *Topology {
}
disk := server.getOrCreateDisk("")
- deltaDiskUsages := newDiskUsages()
- deltaDiskUsage := deltaDiskUsages.getOrCreateDisk("")
- deltaDiskUsage.maxVolumeCount = int64(serverMap["limit"].(float64))
- disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
+ disk.UpAdjustDiskUsageDelta("", &DiskUsageCounts{
+ maxVolumeCount: int64(serverMap["limit"].(float64)),
+ })
}
}