diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-08-23 21:02:30 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-08-23 21:02:30 -0700 |
| commit | 7acebf11ea4dff896a4386a6ff851f5b60b3774b (patch) | |
| tree | d81725cf18d7e6a981995dfe8690a80bd11491ea /weed/topology/node.go | |
| parent | 91b88262d7a05b6865af748ef466f67c1f14eb76 (diff) | |
| download | seaweedfs-7acebf11ea4dff896a4386a6ff851f5b60b3774b.tar.xz seaweedfs-7acebf11ea4dff896a4386a6ff851f5b60b3774b.zip | |
Master: volume assignment concurrency (#7159)
* volume assginment concurrency
* accurate tests
* ensure uniqness
* reserve atomically
* address comments
* atomic
* ReserveOneVolumeForReservation
* duplicated
* Update weed/topology/node.go
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
* Update weed/topology/node.go
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
* atomic counter
* dedup
* select the appropriate functions based on the useReservations flag
---------
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Diffstat (limited to 'weed/topology/node.go')
| -rw-r--r-- | weed/topology/node.go | 179 |
1 files changed, 176 insertions, 3 deletions
diff --git a/weed/topology/node.go b/weed/topology/node.go index aa178b561..60e7427af 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -2,6 +2,7 @@ package topology import ( "errors" + "fmt" "math/rand/v2" "strings" "sync" @@ -16,15 +17,124 @@ import ( ) type NodeId string + +// CapacityReservation represents a temporary reservation of capacity +type CapacityReservation struct { + reservationId string + diskType types.DiskType + count int64 + createdAt time.Time +} + +// CapacityReservations manages capacity reservations for a node +type CapacityReservations struct { + sync.RWMutex + reservations map[string]*CapacityReservation + reservedCounts map[types.DiskType]int64 +} + +func newCapacityReservations() *CapacityReservations { + return &CapacityReservations{ + reservations: make(map[string]*CapacityReservation), + reservedCounts: make(map[types.DiskType]int64), + } +} + +func (cr *CapacityReservations) addReservation(diskType types.DiskType, count int64) string { + cr.Lock() + defer cr.Unlock() + + return cr.doAddReservation(diskType, count) +} + +func (cr *CapacityReservations) removeReservation(reservationId string) bool { + cr.Lock() + defer cr.Unlock() + + if reservation, exists := cr.reservations[reservationId]; exists { + delete(cr.reservations, reservationId) + cr.decrementCount(reservation.diskType, reservation.count) + return true + } + return false +} + +func (cr *CapacityReservations) getReservedCount(diskType types.DiskType) int64 { + cr.RLock() + defer cr.RUnlock() + + return cr.reservedCounts[diskType] +} + +// decrementCount is a helper to decrement reserved count and clean up zero entries +func (cr *CapacityReservations) decrementCount(diskType types.DiskType, count int64) { + cr.reservedCounts[diskType] -= count + // Clean up zero counts to prevent map growth + if cr.reservedCounts[diskType] <= 0 { + delete(cr.reservedCounts, diskType) + } +} + +// doAddReservation is a helper to add a reservation, assuming the lock is already held +func (cr *CapacityReservations) doAddReservation(diskType types.DiskType, count int64) string { + now := time.Now() + reservationId := fmt.Sprintf("%s-%d-%d-%d", diskType, count, now.UnixNano(), rand.Int64()) + cr.reservations[reservationId] = &CapacityReservation{ + reservationId: reservationId, + diskType: diskType, + count: count, + createdAt: now, + } + cr.reservedCounts[diskType] += count + return reservationId +} + +// tryReserveAtomic atomically checks available space and reserves if possible +func (cr *CapacityReservations) tryReserveAtomic(diskType types.DiskType, count int64, availableSpaceFunc func() int64) (reservationId string, success bool) { + cr.Lock() + defer cr.Unlock() + + // Check available space under lock + currentReserved := cr.reservedCounts[diskType] + availableSpace := availableSpaceFunc() - currentReserved + + if availableSpace >= count { + // Create and add reservation atomically + return cr.doAddReservation(diskType, count), true + } + + return "", false +} + +func (cr *CapacityReservations) cleanExpiredReservations(expirationDuration time.Duration) { + cr.Lock() + defer cr.Unlock() + + now := time.Now() + for id, reservation := range cr.reservations { + if now.Sub(reservation.createdAt) > expirationDuration { + delete(cr.reservations, id) + cr.decrementCount(reservation.diskType, reservation.count) + glog.V(1).Infof("Cleaned up expired capacity reservation: %s", id) + } + } +} + type Node interface { Id() NodeId String() string AvailableSpaceFor(option *VolumeGrowOption) int64 ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error) + ReserveOneVolumeForReservation(r int64, option *VolumeGrowOption) (*DataNode, error) UpAdjustDiskUsageDelta(diskType types.DiskType, diskUsage *DiskUsageCounts) UpAdjustMaxVolumeId(vid needle.VolumeId) GetDiskUsages() *DiskUsages + // Capacity reservation methods for avoiding race conditions + TryReserveCapacity(diskType types.DiskType, count int64) (reservationId string, success bool) + ReleaseReservedCapacity(reservationId string) + AvailableSpaceForReservation(option *VolumeGrowOption) int64 + GetMaxVolumeId() needle.VolumeId SetParent(Node) LinkChildNode(node Node) @@ -52,6 +162,9 @@ type NodeImpl struct { //for rack, data center, topology nodeType string value interface{} + + // capacity reservations to prevent race conditions during volume creation + capacityReservations *CapacityReservations } func (n *NodeImpl) GetDiskUsages() *DiskUsages { @@ -164,6 +277,42 @@ func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 { } return freeVolumeSlotCount } + +// AvailableSpaceForReservation returns available space considering existing reservations +func (n *NodeImpl) AvailableSpaceForReservation(option *VolumeGrowOption) int64 { + baseAvailable := n.AvailableSpaceFor(option) + reservedCount := n.capacityReservations.getReservedCount(option.DiskType) + return baseAvailable - reservedCount +} + +// TryReserveCapacity attempts to atomically reserve capacity for volume creation +func (n *NodeImpl) TryReserveCapacity(diskType types.DiskType, count int64) (reservationId string, success bool) { + const reservationTimeout = 5 * time.Minute // TODO: make this configurable + + // Clean up any expired reservations first + n.capacityReservations.cleanExpiredReservations(reservationTimeout) + + // Atomically check and reserve space + option := &VolumeGrowOption{DiskType: diskType} + reservationId, success = n.capacityReservations.tryReserveAtomic(diskType, count, func() int64 { + return n.AvailableSpaceFor(option) + }) + + if success { + glog.V(1).Infof("Reserved %d capacity for diskType %s on node %s: %s", count, diskType, n.Id(), reservationId) + } + + return reservationId, success +} + +// ReleaseReservedCapacity releases a previously reserved capacity +func (n *NodeImpl) ReleaseReservedCapacity(reservationId string) { + if n.capacityReservations.removeReservation(reservationId) { + glog.V(1).Infof("Released capacity reservation on node %s: %s", n.Id(), reservationId) + } else { + glog.V(1).Infof("Attempted to release non-existent reservation on node %s: %s", n.Id(), reservationId) + } +} func (n *NodeImpl) SetParent(node Node) { n.parent = node } @@ -186,10 +335,24 @@ func (n *NodeImpl) GetValue() interface{} { } func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) { + return n.reserveOneVolumeInternal(r, option, false) +} + +// ReserveOneVolumeForReservation selects a node using reservation-aware capacity checks +func (n *NodeImpl) ReserveOneVolumeForReservation(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) { + return n.reserveOneVolumeInternal(r, option, true) +} + +func (n *NodeImpl) reserveOneVolumeInternal(r int64, option *VolumeGrowOption, useReservations bool) (assignedNode *DataNode, err error) { n.RLock() defer n.RUnlock() for _, node := range n.children { - freeSpace := node.AvailableSpaceFor(option) + var freeSpace int64 + if useReservations { + freeSpace = node.AvailableSpaceForReservation(option) + } else { + freeSpace = node.AvailableSpaceFor(option) + } // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) if freeSpace <= 0 { continue @@ -197,7 +360,13 @@ func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assigned if r >= freeSpace { r -= freeSpace } else { - if node.IsDataNode() && node.AvailableSpaceFor(option) > 0 { + var hasSpace bool + if useReservations { + hasSpace = node.IsDataNode() && node.AvailableSpaceForReservation(option) > 0 + } else { + hasSpace = node.IsDataNode() && node.AvailableSpaceFor(option) > 0 + } + if hasSpace { // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) dn := node.(*DataNode) if dn.IsTerminating { @@ -205,7 +374,11 @@ func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assigned } return dn, nil } - assignedNode, err = node.ReserveOneVolume(r, option) + if useReservations { + assignedNode, err = node.ReserveOneVolumeForReservation(r, option) + } else { + assignedNode, err = node.ReserveOneVolume(r, option) + } if err == nil { return } |
