aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/node.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-08-23 21:02:30 -0700
committerGitHub <noreply@github.com>2025-08-23 21:02:30 -0700
commit7acebf11ea4dff896a4386a6ff851f5b60b3774b (patch)
treed81725cf18d7e6a981995dfe8690a80bd11491ea /weed/topology/node.go
parent91b88262d7a05b6865af748ef466f67c1f14eb76 (diff)
downloadseaweedfs-7acebf11ea4dff896a4386a6ff851f5b60b3774b.tar.xz
seaweedfs-7acebf11ea4dff896a4386a6ff851f5b60b3774b.zip
Master: volume assignment concurrency (#7159)
* volume assginment concurrency * accurate tests * ensure uniqness * reserve atomically * address comments * atomic * ReserveOneVolumeForReservation * duplicated * Update weed/topology/node.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update weed/topology/node.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * atomic counter * dedup * select the appropriate functions based on the useReservations flag --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Diffstat (limited to 'weed/topology/node.go')
-rw-r--r--weed/topology/node.go179
1 files changed, 176 insertions, 3 deletions
diff --git a/weed/topology/node.go b/weed/topology/node.go
index aa178b561..60e7427af 100644
--- a/weed/topology/node.go
+++ b/weed/topology/node.go
@@ -2,6 +2,7 @@ package topology
import (
"errors"
+ "fmt"
"math/rand/v2"
"strings"
"sync"
@@ -16,15 +17,124 @@ import (
)
type NodeId string
+
+// CapacityReservation represents a temporary reservation of capacity
+type CapacityReservation struct {
+ reservationId string
+ diskType types.DiskType
+ count int64
+ createdAt time.Time
+}
+
+// CapacityReservations manages capacity reservations for a node
+type CapacityReservations struct {
+ sync.RWMutex
+ reservations map[string]*CapacityReservation
+ reservedCounts map[types.DiskType]int64
+}
+
+func newCapacityReservations() *CapacityReservations {
+ return &CapacityReservations{
+ reservations: make(map[string]*CapacityReservation),
+ reservedCounts: make(map[types.DiskType]int64),
+ }
+}
+
+func (cr *CapacityReservations) addReservation(diskType types.DiskType, count int64) string {
+ cr.Lock()
+ defer cr.Unlock()
+
+ return cr.doAddReservation(diskType, count)
+}
+
+func (cr *CapacityReservations) removeReservation(reservationId string) bool {
+ cr.Lock()
+ defer cr.Unlock()
+
+ if reservation, exists := cr.reservations[reservationId]; exists {
+ delete(cr.reservations, reservationId)
+ cr.decrementCount(reservation.diskType, reservation.count)
+ return true
+ }
+ return false
+}
+
+func (cr *CapacityReservations) getReservedCount(diskType types.DiskType) int64 {
+ cr.RLock()
+ defer cr.RUnlock()
+
+ return cr.reservedCounts[diskType]
+}
+
+// decrementCount is a helper to decrement reserved count and clean up zero entries
+func (cr *CapacityReservations) decrementCount(diskType types.DiskType, count int64) {
+ cr.reservedCounts[diskType] -= count
+ // Clean up zero counts to prevent map growth
+ if cr.reservedCounts[diskType] <= 0 {
+ delete(cr.reservedCounts, diskType)
+ }
+}
+
+// doAddReservation is a helper to add a reservation, assuming the lock is already held
+func (cr *CapacityReservations) doAddReservation(diskType types.DiskType, count int64) string {
+ now := time.Now()
+ reservationId := fmt.Sprintf("%s-%d-%d-%d", diskType, count, now.UnixNano(), rand.Int64())
+ cr.reservations[reservationId] = &CapacityReservation{
+ reservationId: reservationId,
+ diskType: diskType,
+ count: count,
+ createdAt: now,
+ }
+ cr.reservedCounts[diskType] += count
+ return reservationId
+}
+
+// tryReserveAtomic atomically checks available space and reserves if possible
+func (cr *CapacityReservations) tryReserveAtomic(diskType types.DiskType, count int64, availableSpaceFunc func() int64) (reservationId string, success bool) {
+ cr.Lock()
+ defer cr.Unlock()
+
+ // Check available space under lock
+ currentReserved := cr.reservedCounts[diskType]
+ availableSpace := availableSpaceFunc() - currentReserved
+
+ if availableSpace >= count {
+ // Create and add reservation atomically
+ return cr.doAddReservation(diskType, count), true
+ }
+
+ return "", false
+}
+
+func (cr *CapacityReservations) cleanExpiredReservations(expirationDuration time.Duration) {
+ cr.Lock()
+ defer cr.Unlock()
+
+ now := time.Now()
+ for id, reservation := range cr.reservations {
+ if now.Sub(reservation.createdAt) > expirationDuration {
+ delete(cr.reservations, id)
+ cr.decrementCount(reservation.diskType, reservation.count)
+ glog.V(1).Infof("Cleaned up expired capacity reservation: %s", id)
+ }
+ }
+}
+
type Node interface {
Id() NodeId
String() string
AvailableSpaceFor(option *VolumeGrowOption) int64
ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error)
+ ReserveOneVolumeForReservation(r int64, option *VolumeGrowOption) (*DataNode, error)
UpAdjustDiskUsageDelta(diskType types.DiskType, diskUsage *DiskUsageCounts)
UpAdjustMaxVolumeId(vid needle.VolumeId)
GetDiskUsages() *DiskUsages
+ // Capacity reservation methods for avoiding race conditions
+ TryReserveCapacity(diskType types.DiskType, count int64) (reservationId string, success bool)
+ ReleaseReservedCapacity(reservationId string)
+ AvailableSpaceForReservation(option *VolumeGrowOption) int64
+
GetMaxVolumeId() needle.VolumeId
SetParent(Node)
LinkChildNode(node Node)
@@ -52,6 +162,9 @@ type NodeImpl struct {
//for rack, data center, topology
nodeType string
value interface{}
+
+ // capacity reservations to prevent race conditions during volume creation
+ capacityReservations *CapacityReservations
}
func (n *NodeImpl) GetDiskUsages() *DiskUsages {
@@ -164,6 +277,42 @@ func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 {
}
return freeVolumeSlotCount
}
+
+// AvailableSpaceForReservation returns available space considering existing reservations
+func (n *NodeImpl) AvailableSpaceForReservation(option *VolumeGrowOption) int64 {
+ baseAvailable := n.AvailableSpaceFor(option)
+ reservedCount := n.capacityReservations.getReservedCount(option.DiskType)
+ return baseAvailable - reservedCount
+}
+
+// TryReserveCapacity attempts to atomically reserve capacity for volume creation
+func (n *NodeImpl) TryReserveCapacity(diskType types.DiskType, count int64) (reservationId string, success bool) {
+ const reservationTimeout = 5 * time.Minute // TODO: make this configurable
+
+ // Clean up any expired reservations first
+ n.capacityReservations.cleanExpiredReservations(reservationTimeout)
+
+ // Atomically check and reserve space
+ option := &VolumeGrowOption{DiskType: diskType}
+ reservationId, success = n.capacityReservations.tryReserveAtomic(diskType, count, func() int64 {
+ return n.AvailableSpaceFor(option)
+ })
+
+ if success {
+ glog.V(1).Infof("Reserved %d capacity for diskType %s on node %s: %s", count, diskType, n.Id(), reservationId)
+ }
+
+ return reservationId, success
+}
+
+// ReleaseReservedCapacity releases a previously reserved capacity
+func (n *NodeImpl) ReleaseReservedCapacity(reservationId string) {
+ if n.capacityReservations.removeReservation(reservationId) {
+ glog.V(1).Infof("Released capacity reservation on node %s: %s", n.Id(), reservationId)
+ } else {
+ glog.V(1).Infof("Attempted to release non-existent reservation on node %s: %s", n.Id(), reservationId)
+ }
+}
func (n *NodeImpl) SetParent(node Node) {
n.parent = node
}
@@ -186,10 +335,24 @@ func (n *NodeImpl) GetValue() interface{} {
}
func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) {
+ return n.reserveOneVolumeInternal(r, option, false)
+}
+
+// ReserveOneVolumeForReservation selects a node using reservation-aware capacity checks
+func (n *NodeImpl) ReserveOneVolumeForReservation(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) {
+ return n.reserveOneVolumeInternal(r, option, true)
+}
+
+func (n *NodeImpl) reserveOneVolumeInternal(r int64, option *VolumeGrowOption, useReservations bool) (assignedNode *DataNode, err error) {
n.RLock()
defer n.RUnlock()
for _, node := range n.children {
- freeSpace := node.AvailableSpaceFor(option)
+ var freeSpace int64
+ if useReservations {
+ freeSpace = node.AvailableSpaceForReservation(option)
+ } else {
+ freeSpace = node.AvailableSpaceFor(option)
+ }
// fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
if freeSpace <= 0 {
continue
@@ -197,7 +360,13 @@ func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assigned
if r >= freeSpace {
r -= freeSpace
} else {
- if node.IsDataNode() && node.AvailableSpaceFor(option) > 0 {
+ var hasSpace bool
+ if useReservations {
+ hasSpace = node.IsDataNode() && node.AvailableSpaceForReservation(option) > 0
+ } else {
+ hasSpace = node.IsDataNode() && node.AvailableSpaceFor(option) > 0
+ }
+ if hasSpace {
// fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
dn := node.(*DataNode)
if dn.IsTerminating {
@@ -205,7 +374,11 @@ func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assigned
}
return dn, nil
}
- assignedNode, err = node.ReserveOneVolume(r, option)
+ if useReservations {
+ assignedNode, err = node.ReserveOneVolumeForReservation(r, option)
+ } else {
+ assignedNode, err = node.ReserveOneVolume(r, option)
+ }
if err == nil {
return
}