aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-08-23 21:02:30 -0700
committerGitHub <noreply@github.com>2025-08-23 21:02:30 -0700
commit7acebf11ea4dff896a4386a6ff851f5b60b3774b (patch)
treed81725cf18d7e6a981995dfe8690a80bd11491ea
parent91b88262d7a05b6865af748ef466f67c1f14eb76 (diff)
downloadseaweedfs-7acebf11ea4dff896a4386a6ff851f5b60b3774b.tar.xz
seaweedfs-7acebf11ea4dff896a4386a6ff851f5b60b3774b.zip
Master: volume assignment concurrency (#7159)
* volume assginment concurrency * accurate tests * ensure uniqness * reserve atomically * address comments * atomic * ReserveOneVolumeForReservation * duplicated * Update weed/topology/node.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update weed/topology/node.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * atomic counter * dedup * select the appropriate functions based on the useReservations flag --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
-rw-r--r--weed/topology/capacity_reservation_test.go215
-rw-r--r--weed/topology/data_center.go4
-rw-r--r--weed/topology/data_node.go1
-rw-r--r--weed/topology/node.go179
-rw-r--r--weed/topology/race_condition_stress_test.go306
-rw-r--r--weed/topology/rack.go8
-rw-r--r--weed/topology/topology.go1
-rw-r--r--weed/topology/volume_growth.go122
-rw-r--r--weed/topology/volume_growth_reservation_test.go276
-rw-r--r--weed/topology/volume_growth_test.go6
10 files changed, 1086 insertions, 32 deletions
diff --git a/weed/topology/capacity_reservation_test.go b/weed/topology/capacity_reservation_test.go
new file mode 100644
index 000000000..38cb14c50
--- /dev/null
+++ b/weed/topology/capacity_reservation_test.go
@@ -0,0 +1,215 @@
+package topology
+
+import (
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/storage/types"
+)
+
+func TestCapacityReservations_BasicOperations(t *testing.T) {
+ cr := newCapacityReservations()
+ diskType := types.HardDriveType
+
+ // Test initial state
+ if count := cr.getReservedCount(diskType); count != 0 {
+ t.Errorf("Expected 0 reserved count initially, got %d", count)
+ }
+
+ // Test add reservation
+ reservationId := cr.addReservation(diskType, 5)
+ if reservationId == "" {
+ t.Error("Expected non-empty reservation ID")
+ }
+
+ if count := cr.getReservedCount(diskType); count != 5 {
+ t.Errorf("Expected 5 reserved count, got %d", count)
+ }
+
+ // Test multiple reservations
+ cr.addReservation(diskType, 3)
+ if count := cr.getReservedCount(diskType); count != 8 {
+ t.Errorf("Expected 8 reserved count after second reservation, got %d", count)
+ }
+
+ // Test remove reservation
+ success := cr.removeReservation(reservationId)
+ if !success {
+ t.Error("Expected successful removal of existing reservation")
+ }
+
+ if count := cr.getReservedCount(diskType); count != 3 {
+ t.Errorf("Expected 3 reserved count after removal, got %d", count)
+ }
+
+ // Test remove non-existent reservation
+ success = cr.removeReservation("non-existent-id")
+ if success {
+ t.Error("Expected failure when removing non-existent reservation")
+ }
+}
+
+func TestCapacityReservations_ExpiredCleaning(t *testing.T) {
+ cr := newCapacityReservations()
+ diskType := types.HardDriveType
+
+ // Add reservations and manipulate their creation time
+ reservationId1 := cr.addReservation(diskType, 3)
+ reservationId2 := cr.addReservation(diskType, 2)
+
+ // Make one reservation "old"
+ cr.Lock()
+ if reservation, exists := cr.reservations[reservationId1]; exists {
+ reservation.createdAt = time.Now().Add(-10 * time.Minute) // 10 minutes ago
+ }
+ cr.Unlock()
+
+ // Clean expired reservations (5 minute expiration)
+ cr.cleanExpiredReservations(5 * time.Minute)
+
+ // Only the non-expired reservation should remain
+ if count := cr.getReservedCount(diskType); count != 2 {
+ t.Errorf("Expected 2 reserved count after cleaning, got %d", count)
+ }
+
+ // Verify the right reservation was kept
+ if !cr.removeReservation(reservationId2) {
+ t.Error("Expected recent reservation to still exist")
+ }
+
+ if cr.removeReservation(reservationId1) {
+ t.Error("Expected old reservation to be cleaned up")
+ }
+}
+
+func TestCapacityReservations_DifferentDiskTypes(t *testing.T) {
+ cr := newCapacityReservations()
+
+ // Add reservations for different disk types
+ cr.addReservation(types.HardDriveType, 5)
+ cr.addReservation(types.SsdType, 3)
+
+ // Check counts are separate
+ if count := cr.getReservedCount(types.HardDriveType); count != 5 {
+ t.Errorf("Expected 5 HDD reserved count, got %d", count)
+ }
+
+ if count := cr.getReservedCount(types.SsdType); count != 3 {
+ t.Errorf("Expected 3 SSD reserved count, got %d", count)
+ }
+}
+
+func TestNodeImpl_ReservationMethods(t *testing.T) {
+ // Create a test data node
+ dn := NewDataNode("test-node")
+ diskType := types.HardDriveType
+
+ // Set up some capacity
+ diskUsage := dn.diskUsages.getOrCreateDisk(diskType)
+ diskUsage.maxVolumeCount = 10
+ diskUsage.volumeCount = 5 // 5 volumes free initially
+
+ option := &VolumeGrowOption{DiskType: diskType}
+
+ // Test available space calculation
+ available := dn.AvailableSpaceFor(option)
+ if available != 5 {
+ t.Errorf("Expected 5 available slots, got %d", available)
+ }
+
+ availableForReservation := dn.AvailableSpaceForReservation(option)
+ if availableForReservation != 5 {
+ t.Errorf("Expected 5 available slots for reservation, got %d", availableForReservation)
+ }
+
+ // Test successful reservation
+ reservationId, success := dn.TryReserveCapacity(diskType, 3)
+ if !success {
+ t.Error("Expected successful reservation")
+ }
+ if reservationId == "" {
+ t.Error("Expected non-empty reservation ID")
+ }
+
+ // Available space should be reduced by reservations
+ availableForReservation = dn.AvailableSpaceForReservation(option)
+ if availableForReservation != 2 {
+ t.Errorf("Expected 2 available slots after reservation, got %d", availableForReservation)
+ }
+
+ // Base available space should remain unchanged
+ available = dn.AvailableSpaceFor(option)
+ if available != 5 {
+ t.Errorf("Expected base available to remain 5, got %d", available)
+ }
+
+ // Test reservation failure when insufficient capacity
+ _, success = dn.TryReserveCapacity(diskType, 3)
+ if success {
+ t.Error("Expected reservation failure due to insufficient capacity")
+ }
+
+ // Test release reservation
+ dn.ReleaseReservedCapacity(reservationId)
+ availableForReservation = dn.AvailableSpaceForReservation(option)
+ if availableForReservation != 5 {
+ t.Errorf("Expected 5 available slots after release, got %d", availableForReservation)
+ }
+}
+
+func TestNodeImpl_ConcurrentReservations(t *testing.T) {
+ dn := NewDataNode("test-node")
+ diskType := types.HardDriveType
+
+ // Set up capacity
+ diskUsage := dn.diskUsages.getOrCreateDisk(diskType)
+ diskUsage.maxVolumeCount = 10
+ diskUsage.volumeCount = 0 // 10 volumes free initially
+
+ // Test concurrent reservations using goroutines
+ var wg sync.WaitGroup
+ var reservationIds sync.Map
+ concurrentRequests := 10
+ wg.Add(concurrentRequests)
+
+ for i := 0; i < concurrentRequests; i++ {
+ go func(i int) {
+ defer wg.Done()
+ if reservationId, success := dn.TryReserveCapacity(diskType, 1); success {
+ reservationIds.Store(reservationId, true)
+ t.Logf("goroutine %d: Successfully reserved %s", i, reservationId)
+ } else {
+ t.Errorf("goroutine %d: Expected successful reservation", i)
+ }
+ }(i)
+ }
+
+ wg.Wait()
+
+ // Should have no more capacity
+ option := &VolumeGrowOption{DiskType: diskType}
+ if available := dn.AvailableSpaceForReservation(option); available != 0 {
+ t.Errorf("Expected 0 available slots after all reservations, got %d", available)
+ // Debug: check total reserved
+ reservedCount := dn.capacityReservations.getReservedCount(diskType)
+ t.Logf("Debug: Total reserved count: %d", reservedCount)
+ }
+
+ // Next reservation should fail
+ _, success := dn.TryReserveCapacity(diskType, 1)
+ if success {
+ t.Error("Expected reservation failure when at capacity")
+ }
+
+ // Release all reservations
+ reservationIds.Range(func(key, value interface{}) bool {
+ dn.ReleaseReservedCapacity(key.(string))
+ return true
+ })
+
+ // Should have full capacity back
+ if available := dn.AvailableSpaceForReservation(option); available != 10 {
+ t.Errorf("Expected 10 available slots after releasing all, got %d", available)
+ }
+}
diff --git a/weed/topology/data_center.go b/weed/topology/data_center.go
index 03fe20c10..e036621b4 100644
--- a/weed/topology/data_center.go
+++ b/weed/topology/data_center.go
@@ -1,9 +1,10 @@
package topology
import (
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"slices"
"strings"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
type DataCenter struct {
@@ -16,6 +17,7 @@ func NewDataCenter(id string) *DataCenter {
dc.nodeType = "DataCenter"
dc.diskUsages = newDiskUsages()
dc.children = make(map[NodeId]Node)
+ dc.capacityReservations = newCapacityReservations()
dc.NodeImpl.value = dc
return dc
}
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
index 3103dc207..4f2dbe464 100644
--- a/weed/topology/data_node.go
+++ b/weed/topology/data_node.go
@@ -30,6 +30,7 @@ func NewDataNode(id string) *DataNode {
dn.nodeType = "DataNode"
dn.diskUsages = newDiskUsages()
dn.children = make(map[NodeId]Node)
+ dn.capacityReservations = newCapacityReservations()
dn.NodeImpl.value = dn
return dn
}
diff --git a/weed/topology/node.go b/weed/topology/node.go
index aa178b561..60e7427af 100644
--- a/weed/topology/node.go
+++ b/weed/topology/node.go
@@ -2,6 +2,7 @@ package topology
import (
"errors"
+ "fmt"
"math/rand/v2"
"strings"
"sync"
@@ -16,15 +17,124 @@ import (
)
type NodeId string
+
+// CapacityReservation represents a temporary reservation of capacity
+type CapacityReservation struct {
+ reservationId string
+ diskType types.DiskType
+ count int64
+ createdAt time.Time
+}
+
+// CapacityReservations manages capacity reservations for a node
+type CapacityReservations struct {
+ sync.RWMutex
+ reservations map[string]*CapacityReservation
+ reservedCounts map[types.DiskType]int64
+}
+
+func newCapacityReservations() *CapacityReservations {
+ return &CapacityReservations{
+ reservations: make(map[string]*CapacityReservation),
+ reservedCounts: make(map[types.DiskType]int64),
+ }
+}
+
+func (cr *CapacityReservations) addReservation(diskType types.DiskType, count int64) string {
+ cr.Lock()
+ defer cr.Unlock()
+
+ return cr.doAddReservation(diskType, count)
+}
+
+func (cr *CapacityReservations) removeReservation(reservationId string) bool {
+ cr.Lock()
+ defer cr.Unlock()
+
+ if reservation, exists := cr.reservations[reservationId]; exists {
+ delete(cr.reservations, reservationId)
+ cr.decrementCount(reservation.diskType, reservation.count)
+ return true
+ }
+ return false
+}
+
+func (cr *CapacityReservations) getReservedCount(diskType types.DiskType) int64 {
+ cr.RLock()
+ defer cr.RUnlock()
+
+ return cr.reservedCounts[diskType]
+}
+
+// decrementCount is a helper to decrement reserved count and clean up zero entries
+func (cr *CapacityReservations) decrementCount(diskType types.DiskType, count int64) {
+ cr.reservedCounts[diskType] -= count
+ // Clean up zero counts to prevent map growth
+ if cr.reservedCounts[diskType] <= 0 {
+ delete(cr.reservedCounts, diskType)
+ }
+}
+
+// doAddReservation is a helper to add a reservation, assuming the lock is already held
+func (cr *CapacityReservations) doAddReservation(diskType types.DiskType, count int64) string {
+ now := time.Now()
+ reservationId := fmt.Sprintf("%s-%d-%d-%d", diskType, count, now.UnixNano(), rand.Int64())
+ cr.reservations[reservationId] = &CapacityReservation{
+ reservationId: reservationId,
+ diskType: diskType,
+ count: count,
+ createdAt: now,
+ }
+ cr.reservedCounts[diskType] += count
+ return reservationId
+}
+
+// tryReserveAtomic atomically checks available space and reserves if possible
+func (cr *CapacityReservations) tryReserveAtomic(diskType types.DiskType, count int64, availableSpaceFunc func() int64) (reservationId string, success bool) {
+ cr.Lock()
+ defer cr.Unlock()
+
+ // Check available space under lock
+ currentReserved := cr.reservedCounts[diskType]
+ availableSpace := availableSpaceFunc() - currentReserved
+
+ if availableSpace >= count {
+ // Create and add reservation atomically
+ return cr.doAddReservation(diskType, count), true
+ }
+
+ return "", false
+}
+
+func (cr *CapacityReservations) cleanExpiredReservations(expirationDuration time.Duration) {
+ cr.Lock()
+ defer cr.Unlock()
+
+ now := time.Now()
+ for id, reservation := range cr.reservations {
+ if now.Sub(reservation.createdAt) > expirationDuration {
+ delete(cr.reservations, id)
+ cr.decrementCount(reservation.diskType, reservation.count)
+ glog.V(1).Infof("Cleaned up expired capacity reservation: %s", id)
+ }
+ }
+}
+
type Node interface {
Id() NodeId
String() string
AvailableSpaceFor(option *VolumeGrowOption) int64
ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error)
+ ReserveOneVolumeForReservation(r int64, option *VolumeGrowOption) (*DataNode, error)
UpAdjustDiskUsageDelta(diskType types.DiskType, diskUsage *DiskUsageCounts)
UpAdjustMaxVolumeId(vid needle.VolumeId)
GetDiskUsages() *DiskUsages
+ // Capacity reservation methods for avoiding race conditions
+ TryReserveCapacity(diskType types.DiskType, count int64) (reservationId string, success bool)
+ ReleaseReservedCapacity(reservationId string)
+ AvailableSpaceForReservation(option *VolumeGrowOption) int64
+
GetMaxVolumeId() needle.VolumeId
SetParent(Node)
LinkChildNode(node Node)
@@ -52,6 +162,9 @@ type NodeImpl struct {
//for rack, data center, topology
nodeType string
value interface{}
+
+ // capacity reservations to prevent race conditions during volume creation
+ capacityReservations *CapacityReservations
}
func (n *NodeImpl) GetDiskUsages() *DiskUsages {
@@ -164,6 +277,42 @@ func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 {
}
return freeVolumeSlotCount
}
+
+// AvailableSpaceForReservation returns available space considering existing reservations
+func (n *NodeImpl) AvailableSpaceForReservation(option *VolumeGrowOption) int64 {
+ baseAvailable := n.AvailableSpaceFor(option)
+ reservedCount := n.capacityReservations.getReservedCount(option.DiskType)
+ return baseAvailable - reservedCount
+}
+
+// TryReserveCapacity attempts to atomically reserve capacity for volume creation
+func (n *NodeImpl) TryReserveCapacity(diskType types.DiskType, count int64) (reservationId string, success bool) {
+ const reservationTimeout = 5 * time.Minute // TODO: make this configurable
+
+ // Clean up any expired reservations first
+ n.capacityReservations.cleanExpiredReservations(reservationTimeout)
+
+ // Atomically check and reserve space
+ option := &VolumeGrowOption{DiskType: diskType}
+ reservationId, success = n.capacityReservations.tryReserveAtomic(diskType, count, func() int64 {
+ return n.AvailableSpaceFor(option)
+ })
+
+ if success {
+ glog.V(1).Infof("Reserved %d capacity for diskType %s on node %s: %s", count, diskType, n.Id(), reservationId)
+ }
+
+ return reservationId, success
+}
+
+// ReleaseReservedCapacity releases a previously reserved capacity
+func (n *NodeImpl) ReleaseReservedCapacity(reservationId string) {
+ if n.capacityReservations.removeReservation(reservationId) {
+ glog.V(1).Infof("Released capacity reservation on node %s: %s", n.Id(), reservationId)
+ } else {
+ glog.V(1).Infof("Attempted to release non-existent reservation on node %s: %s", n.Id(), reservationId)
+ }
+}
func (n *NodeImpl) SetParent(node Node) {
n.parent = node
}
@@ -186,10 +335,24 @@ func (n *NodeImpl) GetValue() interface{} {
}
func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) {
+ return n.reserveOneVolumeInternal(r, option, false)
+}
+
+// ReserveOneVolumeForReservation selects a node using reservation-aware capacity checks
+func (n *NodeImpl) ReserveOneVolumeForReservation(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) {
+ return n.reserveOneVolumeInternal(r, option, true)
+}
+
+func (n *NodeImpl) reserveOneVolumeInternal(r int64, option *VolumeGrowOption, useReservations bool) (assignedNode *DataNode, err error) {
n.RLock()
defer n.RUnlock()
for _, node := range n.children {
- freeSpace := node.AvailableSpaceFor(option)
+ var freeSpace int64
+ if useReservations {
+ freeSpace = node.AvailableSpaceForReservation(option)
+ } else {
+ freeSpace = node.AvailableSpaceFor(option)
+ }
// fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
if freeSpace <= 0 {
continue
@@ -197,7 +360,13 @@ func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assigned
if r >= freeSpace {
r -= freeSpace
} else {
- if node.IsDataNode() && node.AvailableSpaceFor(option) > 0 {
+ var hasSpace bool
+ if useReservations {
+ hasSpace = node.IsDataNode() && node.AvailableSpaceForReservation(option) > 0
+ } else {
+ hasSpace = node.IsDataNode() && node.AvailableSpaceFor(option) > 0
+ }
+ if hasSpace {
// fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
dn := node.(*DataNode)
if dn.IsTerminating {
@@ -205,7 +374,11 @@ func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assigned
}
return dn, nil
}
- assignedNode, err = node.ReserveOneVolume(r, option)
+ if useReservations {
+ assignedNode, err = node.ReserveOneVolumeForReservation(r, option)
+ } else {
+ assignedNode, err = node.ReserveOneVolume(r, option)
+ }
if err == nil {
return
}
diff --git a/weed/topology/race_condition_stress_test.go b/weed/topology/race_condition_stress_test.go
new file mode 100644
index 000000000..a60f0a32a
--- /dev/null
+++ b/weed/topology/race_condition_stress_test.go
@@ -0,0 +1,306 @@
+package topology
+
+import (
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/sequence"
+ "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
+ "github.com/seaweedfs/seaweedfs/weed/storage/types"
+)
+
+// TestRaceConditionStress simulates the original issue scenario:
+// High concurrent writes causing capacity misjudgment
+func TestRaceConditionStress(t *testing.T) {
+ // Create a cluster similar to the issue description:
+ // 3 volume servers, 200GB each, 5GB volume limit = 40 volumes max per server
+ const (
+ numServers = 3
+ volumeLimitMB = 5000 // 5GB in MB
+ storagePerServerGB = 200 // 200GB per server
+ maxVolumesPerServer = storagePerServerGB * 1024 / volumeLimitMB // 200*1024/5000 = 40
+ concurrentRequests = 50 // High concurrency like the issue
+ )
+
+ // Create test topology
+ topo := NewTopology("weedfs", sequence.NewMemorySequencer(), uint64(volumeLimitMB)*1024*1024, 5, false)
+
+ dc := NewDataCenter("dc1")
+ topo.LinkChildNode(dc)
+ rack := NewRack("rack1")
+ dc.LinkChildNode(rack)
+
+ // Create 3 volume servers with realistic capacity
+ servers := make([]*DataNode, numServers)
+ for i := 0; i < numServers; i++ {
+ dn := NewDataNode(fmt.Sprintf("server%d", i+1))
+ rack.LinkChildNode(dn)
+
+ // Set up disk with capacity for 40 volumes
+ disk := NewDisk(types.HardDriveType.String())
+ disk.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = maxVolumesPerServer
+ dn.LinkChildNode(disk)
+
+ servers[i] = dn
+ }
+
+ vg := NewDefaultVolumeGrowth()
+ rp, _ := super_block.NewReplicaPlacementFromString("000") // Single replica like the issue
+
+ option := &VolumeGrowOption{
+ Collection: "test-bucket-large", // Same collection name as issue
+ ReplicaPlacement: rp,
+ DiskType: types.HardDriveType,
+ }
+
+ // Track results
+ var successfulAllocations int64
+ var failedAllocations int64
+ var totalVolumesCreated int64
+
+ var wg sync.WaitGroup
+
+ // Launch concurrent volume creation requests
+ startTime := time.Now()
+ for i := 0; i < concurrentRequests; i++ {
+ wg.Add(1)
+ go func(requestId int) {
+ defer wg.Done()
+
+ // This is the critical test: multiple threads trying to allocate simultaneously
+ servers, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true)
+
+ if err != nil {
+ atomic.AddInt64(&failedAllocations, 1)
+ t.Logf("Request %d failed: %v", requestId, err)
+ return
+ }
+
+ // Simulate volume creation delay (like in real scenario)
+ time.Sleep(time.Millisecond * 50)
+
+ // Simulate successful volume creation
+ for _, server := range servers {
+ disk := server.children[NodeId(types.HardDriveType.String())].(*Disk)
+ deltaDiskUsage := &DiskUsageCounts{
+ volumeCount: 1,
+ }
+ disk.UpAdjustDiskUsageDelta(types.HardDriveType, deltaDiskUsage)
+ atomic.AddInt64(&totalVolumesCreated, 1)
+ }
+
+ // Release reservations (simulates successful registration)
+ reservation.releaseAllReservations()
+ atomic.AddInt64(&successfulAllocations, 1)
+
+ }(i)
+ }
+
+ wg.Wait()
+ duration := time.Since(startTime)
+
+ // Verify results
+ t.Logf("Test completed in %v", duration)
+ t.Logf("Successful allocations: %d", successfulAllocations)
+ t.Logf("Failed allocations: %d", failedAllocations)
+ t.Logf("Total volumes created: %d", totalVolumesCreated)
+
+ // Check capacity limits are respected
+ totalCapacityUsed := int64(0)
+ for i, server := range servers {
+ disk := server.children[NodeId(types.HardDriveType.String())].(*Disk)
+ volumeCount := disk.diskUsages.getOrCreateDisk(types.HardDriveType).volumeCount
+ totalCapacityUsed += volumeCount
+
+ t.Logf("Server %d: %d volumes (max: %d)", i+1, volumeCount, maxVolumesPerServer)
+
+ // Critical test: No server should exceed its capacity
+ if volumeCount > maxVolumesPerServer {
+ t.Errorf("RACE CONDITION DETECTED: Server %d exceeded capacity: %d > %d",
+ i+1, volumeCount, maxVolumesPerServer)
+ }
+ }
+
+ // Verify totals make sense
+ if totalVolumesCreated != totalCapacityUsed {
+ t.Errorf("Volume count mismatch: created=%d, actual=%d", totalVolumesCreated, totalCapacityUsed)
+ }
+
+ // The total should never exceed the cluster capacity (120 volumes for 3 servers × 40 each)
+ maxClusterCapacity := int64(numServers * maxVolumesPerServer)
+ if totalCapacityUsed > maxClusterCapacity {
+ t.Errorf("RACE CONDITION DETECTED: Cluster capacity exceeded: %d > %d",
+ totalCapacityUsed, maxClusterCapacity)
+ }
+
+ // With reservations, we should have controlled allocation
+ // Total requests = successful + failed should equal concurrentRequests
+ if successfulAllocations+failedAllocations != concurrentRequests {
+ t.Errorf("Request count mismatch: success=%d + failed=%d != total=%d",
+ successfulAllocations, failedAllocations, concurrentRequests)
+ }
+
+ t.Logf("✅ Race condition test passed: Capacity limits respected with %d concurrent requests",
+ concurrentRequests)
+}
+
+// TestCapacityJudgmentAccuracy verifies that the capacity calculation is accurate
+// under various load conditions
+func TestCapacityJudgmentAccuracy(t *testing.T) {
+ // Create a single server with known capacity
+ topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 5*1024*1024*1024, 5, false)
+
+ dc := NewDataCenter("dc1")
+ topo.LinkChildNode(dc)
+ rack := NewRack("rack1")
+ dc.LinkChildNode(rack)
+
+ dn := NewDataNode("server1")
+ rack.LinkChildNode(dn)
+
+ // Server with capacity for exactly 10 volumes
+ disk := NewDisk(types.HardDriveType.String())
+ diskUsage := disk.diskUsages.getOrCreateDisk(types.HardDriveType)
+ diskUsage.maxVolumeCount = 10
+ dn.LinkChildNode(disk)
+
+ // Also set max volume count on the DataNode level (gets propagated up)
+ dn.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 10
+
+ vg := NewDefaultVolumeGrowth()
+ rp, _ := super_block.NewReplicaPlacementFromString("000")
+
+ option := &VolumeGrowOption{
+ Collection: "test",
+ ReplicaPlacement: rp,
+ DiskType: types.HardDriveType,
+ }
+
+ // Test accurate capacity reporting at each step
+ for i := 0; i < 10; i++ {
+ // Check available space before reservation
+ availableBefore := dn.AvailableSpaceFor(option)
+ availableForReservation := dn.AvailableSpaceForReservation(option)
+
+ expectedAvailable := int64(10 - i)
+ if availableBefore != expectedAvailable {
+ t.Errorf("Step %d: Expected %d available, got %d", i, expectedAvailable, availableBefore)
+ }
+
+ if availableForReservation != expectedAvailable {
+ t.Errorf("Step %d: Expected %d available for reservation, got %d", i, expectedAvailable, availableForReservation)
+ }
+
+ // Try to reserve and allocate
+ _, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true)
+ if err != nil {
+ t.Fatalf("Step %d: Unexpected reservation failure: %v", i, err)
+ }
+
+ // Check that available space for reservation decreased
+ availableAfterReservation := dn.AvailableSpaceForReservation(option)
+ if availableAfterReservation != expectedAvailable-1 {
+ t.Errorf("Step %d: Expected %d available after reservation, got %d",
+ i, expectedAvailable-1, availableAfterReservation)
+ }
+
+ // Simulate successful volume creation by properly updating disk usage hierarchy
+ disk := dn.children[NodeId(types.HardDriveType.String())].(*Disk)
+
+ // Create a volume usage delta to simulate volume creation
+ deltaDiskUsage := &DiskUsageCounts{
+ volumeCount: 1,
+ }
+
+ // Properly propagate the usage up the hierarchy
+ disk.UpAdjustDiskUsageDelta(types.HardDriveType, deltaDiskUsage)
+
+ // Debug: Check the volume count after update
+ diskUsageOnNode := dn.diskUsages.getOrCreateDisk(types.HardDriveType)
+ currentVolumeCount := atomic.LoadInt64(&diskUsageOnNode.volumeCount)
+ t.Logf("Step %d: Volume count after update: %d", i, currentVolumeCount)
+
+ // Release reservation
+ reservation.releaseAllReservations()
+
+ // Verify final state
+ availableAfter := dn.AvailableSpaceFor(option)
+ expectedAfter := int64(10 - i - 1)
+ if availableAfter != expectedAfter {
+ t.Errorf("Step %d: Expected %d available after creation, got %d",
+ i, expectedAfter, availableAfter)
+ // More debugging
+ diskUsageOnNode := dn.diskUsages.getOrCreateDisk(types.HardDriveType)
+ maxVolumes := atomic.LoadInt64(&diskUsageOnNode.maxVolumeCount)
+ remoteVolumes := atomic.LoadInt64(&diskUsageOnNode.remoteVolumeCount)
+ actualVolumeCount := atomic.LoadInt64(&diskUsageOnNode.volumeCount)
+ t.Logf("Debug Step %d: max=%d, volume=%d, remote=%d", i, maxVolumes, actualVolumeCount, remoteVolumes)
+ }
+ }
+
+ // At this point, no more reservations should succeed
+ _, _, err := vg.findEmptySlotsForOneVolume(topo, option, true)
+ if err == nil {
+ t.Error("Expected reservation to fail when at capacity")
+ }
+
+ t.Logf("✅ Capacity judgment accuracy test passed")
+}
+
+// TestReservationSystemPerformance measures the performance impact of reservations
+func TestReservationSystemPerformance(t *testing.T) {
+ // Create topology
+ topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
+
+ dc := NewDataCenter("dc1")
+ topo.LinkChildNode(dc)
+ rack := NewRack("rack1")
+ dc.LinkChildNode(rack)
+
+ dn := NewDataNode("server1")
+ rack.LinkChildNode(dn)
+
+ disk := NewDisk(types.HardDriveType.String())
+ disk.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 1000
+ dn.LinkChildNode(disk)
+
+ vg := NewDefaultVolumeGrowth()
+ rp, _ := super_block.NewReplicaPlacementFromString("000")
+
+ option := &VolumeGrowOption{
+ Collection: "test",
+ ReplicaPlacement: rp,
+ DiskType: types.HardDriveType,
+ }
+
+ // Benchmark reservation operations
+ const iterations = 1000
+
+ startTime := time.Now()
+ for i := 0; i < iterations; i++ {
+ _, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true)
+ if err != nil {
+ t.Fatalf("Iteration %d failed: %v", i, err)
+ }
+ reservation.releaseAllReservations()
+
+ // Simulate volume creation
+ diskUsage := dn.diskUsages.getOrCreateDisk(types.HardDriveType)
+ atomic.AddInt64(&diskUsage.volumeCount, 1)
+ }
+ duration := time.Since(startTime)
+
+ avgDuration := duration / iterations
+ t.Logf("Performance: %d reservations in %v (avg: %v per reservation)",
+ iterations, duration, avgDuration)
+
+ // Performance should be reasonable (less than 1ms per reservation on average)
+ if avgDuration > time.Millisecond {
+ t.Errorf("Reservation system performance concern: %v per reservation", avgDuration)
+ } else {
+ t.Logf("✅ Performance test passed: %v per reservation", avgDuration)
+ }
+}
diff --git a/weed/topology/rack.go b/weed/topology/rack.go
index d82ef7986..f526cd84d 100644
--- a/weed/topology/rack.go
+++ b/weed/topology/rack.go
@@ -1,12 +1,13 @@
package topology
import (
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
- "github.com/seaweedfs/seaweedfs/weed/storage/types"
- "github.com/seaweedfs/seaweedfs/weed/util"
"slices"
"strings"
"time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+ "github.com/seaweedfs/seaweedfs/weed/storage/types"
+ "github.com/seaweedfs/seaweedfs/weed/util"
)
type Rack struct {
@@ -19,6 +20,7 @@ func NewRack(id string) *Rack {
r.nodeType = "Rack"
r.diskUsages = newDiskUsages()
r.children = make(map[NodeId]Node)
+ r.capacityReservations = newCapacityReservations()
r.NodeImpl.value = r
return r
}
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 750c00ea2..bbae97d72 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -67,6 +67,7 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls
t.NodeImpl.value = t
t.diskUsages = newDiskUsages()
t.children = make(map[NodeId]Node)
+ t.capacityReservations = newCapacityReservations()
t.collectionMap = util.NewConcurrentReadMap()
t.ecShardMap = make(map[needle.VolumeId]*EcShardLocations)
t.pulse = int64(pulse)
diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go
index c62fd72a0..f7af4e0a5 100644
--- a/weed/topology/volume_growth.go
+++ b/weed/topology/volume_growth.go
@@ -74,6 +74,22 @@ type VolumeGrowth struct {
accessLock sync.Mutex
}
+// VolumeGrowReservation tracks capacity reservations for a volume creation operation
+type VolumeGrowReservation struct {
+ servers []*DataNode
+ reservationIds []string
+ diskType types.DiskType
+}
+
+// releaseAllReservations releases all reservations in this volume grow operation
+func (vgr *VolumeGrowReservation) releaseAllReservations() {
+ for i, server := range vgr.servers {
+ if i < len(vgr.reservationIds) && vgr.reservationIds[i] != "" {
+ server.ReleaseReservedCapacity(vgr.reservationIds[i])
+ }
+ }
+}
+
func (o *VolumeGrowOption) String() string {
blob, _ := json.Marshal(o)
return string(blob)
@@ -125,10 +141,17 @@ func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targe
}
func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (result []*master_pb.VolumeLocation, err error) {
- servers, e := vg.findEmptySlotsForOneVolume(topo, option)
+ servers, reservation, e := vg.findEmptySlotsForOneVolume(topo, option, true) // use reservations
if e != nil {
return nil, e
}
+ // Ensure reservations are released if anything goes wrong
+ defer func() {
+ if err != nil && reservation != nil {
+ reservation.releaseAllReservations()
+ }
+ }()
+
for !topo.LastLeaderChangeTime.Add(constants.VolumePulseSeconds * 2).Before(time.Now()) {
glog.V(0).Infof("wait for volume servers to join back")
time.Sleep(constants.VolumePulseSeconds / 2)
@@ -137,7 +160,7 @@ func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topolo
if raftErr != nil {
return nil, raftErr
}
- if err = vg.grow(grpcDialOption, topo, vid, option, servers...); err == nil {
+ if err = vg.grow(grpcDialOption, topo, vid, option, reservation, servers...); err == nil {
for _, server := range servers {
result = append(result, &master_pb.VolumeLocation{
Url: server.Url(),
@@ -156,9 +179,37 @@ func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topolo
// 2.2 collect all racks that have rp.SameRackCount+1
// 2.2 collect all data centers that have DiffRackCount+rp.SameRackCount+1
// 2. find rest data nodes
-func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) {
+// If useReservations is true, reserves capacity on each server and returns reservation info
+func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption, useReservations bool) (servers []*DataNode, reservation *VolumeGrowReservation, err error) {
//find main datacenter and other data centers
rp := option.ReplicaPlacement
+
+ // Select appropriate functions based on useReservations flag
+ var availableSpaceFunc func(Node, *VolumeGrowOption) int64
+ var reserveOneVolumeFunc func(Node, int64, *VolumeGrowOption) (*DataNode, error)
+
+ if useReservations {
+ availableSpaceFunc = func(node Node, option *VolumeGrowOption) int64 {
+ return node.AvailableSpaceForReservation(option)
+ }
+ reserveOneVolumeFunc = func(node Node, r int64, option *VolumeGrowOption) (*DataNode, error) {
+ return node.ReserveOneVolumeForReservation(r, option)
+ }
+ } else {
+ availableSpaceFunc = func(node Node, option *VolumeGrowOption) int64 {
+ return node.AvailableSpaceFor(option)
+ }
+ reserveOneVolumeFunc = func(node Node, r int64, option *VolumeGrowOption) (*DataNode, error) {
+ return node.ReserveOneVolume(r, option)
+ }
+ }
+
+ // Ensure cleanup of partial reservations on error
+ defer func() {
+ if err != nil && reservation != nil {
+ reservation.releaseAllReservations()
+ }
+ }()
mainDataCenter, otherDataCenters, dc_err := topo.PickNodesByWeight(rp.DiffDataCenterCount+1, option, func(node Node) error {
if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) {
return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter)
@@ -166,14 +217,14 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
if len(node.Children()) < rp.DiffRackCount+1 {
return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1)
}
- if node.AvailableSpaceFor(option) < int64(rp.DiffRackCount+rp.SameRackCount+1) {
- return fmt.Errorf("Free:%d < Expected:%d", node.AvailableSpaceFor(option), rp.DiffRackCount+rp.SameRackCount+1)
+ if availableSpaceFunc(node, option) < int64(rp.DiffRackCount+rp.SameRackCount+1) {
+ return fmt.Errorf("Free:%d < Expected:%d", availableSpaceFunc(node, option), rp.DiffRackCount+rp.SameRackCount+1)
}
possibleRacksCount := 0
for _, rack := range node.Children() {
possibleDataNodesCount := 0
for _, n := range rack.Children() {
- if n.AvailableSpaceFor(option) >= 1 {
+ if availableSpaceFunc(n, option) >= 1 {
possibleDataNodesCount++
}
}
@@ -187,7 +238,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
return nil
})
if dc_err != nil {
- return nil, dc_err
+ return nil, nil, dc_err
}
//find main rack and other racks
@@ -195,8 +246,8 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
}
- if node.AvailableSpaceFor(option) < int64(rp.SameRackCount+1) {
- return fmt.Errorf("Free:%d < Expected:%d", node.AvailableSpaceFor(option), rp.SameRackCount+1)
+ if availableSpaceFunc(node, option) < int64(rp.SameRackCount+1) {
+ return fmt.Errorf("Free:%d < Expected:%d", availableSpaceFunc(node, option), rp.SameRackCount+1)
}
if len(node.Children()) < rp.SameRackCount+1 {
// a bit faster way to test free racks
@@ -204,7 +255,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
}
possibleDataNodesCount := 0
for _, n := range node.Children() {
- if n.AvailableSpaceFor(option) >= 1 {
+ if availableSpaceFunc(n, option) >= 1 {
possibleDataNodesCount++
}
}
@@ -214,7 +265,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
return nil
})
if rackErr != nil {
- return nil, rackErr
+ return nil, nil, rackErr
}
//find main server and other servers
@@ -222,13 +273,13 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
}
- if node.AvailableSpaceFor(option) < 1 {
- return fmt.Errorf("Free:%d < Expected:%d", node.AvailableSpaceFor(option), 1)
+ if availableSpaceFunc(node, option) < 1 {
+ return fmt.Errorf("Free:%d < Expected:%d", availableSpaceFunc(node, option), 1)
}
return nil
})
if serverErr != nil {
- return nil, serverErr
+ return nil, nil, serverErr
}
servers = append(servers, mainServer.(*DataNode))
@@ -236,25 +287,47 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
servers = append(servers, server.(*DataNode))
}
for _, rack := range otherRacks {
- r := rand.Int64N(rack.AvailableSpaceFor(option))
- if server, e := rack.ReserveOneVolume(r, option); e == nil {
+ r := rand.Int64N(availableSpaceFunc(rack, option))
+ if server, e := reserveOneVolumeFunc(rack, r, option); e == nil {
servers = append(servers, server)
} else {
- return servers, e
+ return servers, nil, e
}
}
for _, datacenter := range otherDataCenters {
- r := rand.Int64N(datacenter.AvailableSpaceFor(option))
- if server, e := datacenter.ReserveOneVolume(r, option); e == nil {
+ r := rand.Int64N(availableSpaceFunc(datacenter, option))
+ if server, e := reserveOneVolumeFunc(datacenter, r, option); e == nil {
servers = append(servers, server)
} else {
- return servers, e
+ return servers, nil, e
}
}
- return
+
+ // If reservations are requested, try to reserve capacity on each server
+ if useReservations {
+ reservation = &VolumeGrowReservation{
+ servers: servers,
+ reservationIds: make([]string, len(servers)),
+ diskType: option.DiskType,
+ }
+
+ // Try to reserve capacity on each server
+ for i, server := range servers {
+ reservationId, success := server.TryReserveCapacity(option.DiskType, 1)
+ if !success {
+ return servers, nil, fmt.Errorf("failed to reserve capacity on server %s", server.Id())
+ }
+ reservation.reservationIds[i] = reservationId
+ }
+
+ glog.V(1).Infof("Successfully reserved capacity on %d servers for volume creation", len(servers))
+ }
+
+ return servers, reservation, nil
}
-func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid needle.VolumeId, option *VolumeGrowOption, servers ...*DataNode) (growErr error) {
+// grow creates volumes on the provided servers, optionally managing capacity reservations
+func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid needle.VolumeId, option *VolumeGrowOption, reservation *VolumeGrowReservation, servers ...*DataNode) (growErr error) {
var createdVolumes []storage.VolumeInfo
for _, server := range servers {
if err := AllocateVolume(server, grpcDialOption, vid, option); err == nil {
@@ -283,6 +356,10 @@ func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid
topo.RegisterVolumeLayout(vi, server)
glog.V(0).Infof("Registered Volume %d on %s", vid, server.NodeImpl.String())
}
+ // Release reservations on success since volumes are now registered
+ if reservation != nil {
+ reservation.releaseAllReservations()
+ }
} else {
// cleaning up created volume replicas
for i, vi := range createdVolumes {
@@ -291,6 +368,7 @@ func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid
glog.Warningf("Failed to clean up volume %d on %s", vid, server.NodeImpl.String())
}
}
+ // Reservations will be released by the caller in case of failure
}
return growErr
diff --git a/weed/topology/volume_growth_reservation_test.go b/weed/topology/volume_growth_reservation_test.go
new file mode 100644
index 000000000..7b06e626d
--- /dev/null
+++ b/weed/topology/volume_growth_reservation_test.go
@@ -0,0 +1,276 @@
+package topology
+
+import (
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/sequence"
+ "github.com/seaweedfs/seaweedfs/weed/storage/needle"
+ "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
+ "github.com/seaweedfs/seaweedfs/weed/storage/types"
+)
+
+// MockGrpcDialOption simulates grpc connection for testing
+type MockGrpcDialOption struct{}
+
+// simulateVolumeAllocation mocks the volume allocation process
+func simulateVolumeAllocation(server *DataNode, vid needle.VolumeId, option *VolumeGrowOption) error {
+ // Simulate some processing time
+ time.Sleep(time.Millisecond * 10)
+ return nil
+}
+
+func TestVolumeGrowth_ReservationBasedAllocation(t *testing.T) {
+ // Create test topology with single server for predictable behavior
+ topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
+
+ // Create data center and rack
+ dc := NewDataCenter("dc1")
+ topo.LinkChildNode(dc)
+ rack := NewRack("rack1")
+ dc.LinkChildNode(rack)
+
+ // Create single data node with limited capacity
+ dn := NewDataNode("server1")
+ rack.LinkChildNode(dn)
+
+ // Set up disk with limited capacity (only 5 volumes)
+ disk := NewDisk(types.HardDriveType.String())
+ disk.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 5
+ dn.LinkChildNode(disk)
+
+ // Test volume growth with reservation
+ vg := NewDefaultVolumeGrowth()
+ rp, _ := super_block.NewReplicaPlacementFromString("000") // Single copy (no replicas)
+
+ option := &VolumeGrowOption{
+ Collection: "test",
+ ReplicaPlacement: rp,
+ DiskType: types.HardDriveType,
+ }
+
+ // Try to create volumes and verify reservations work
+ for i := 0; i < 5; i++ {
+ servers, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true)
+ if err != nil {
+ t.Errorf("Failed to find slots with reservation on iteration %d: %v", i, err)
+ continue
+ }
+
+ if len(servers) != 1 {
+ t.Errorf("Expected 1 server for replica placement 000, got %d", len(servers))
+ }
+
+ if len(reservation.reservationIds) != 1 {
+ t.Errorf("Expected 1 reservation ID, got %d", len(reservation.reservationIds))
+ }
+
+ // Verify the reservation is on our expected server
+ server := servers[0]
+ if server != dn {
+ t.Errorf("Expected volume to be allocated on server1, got %s", server.Id())
+ }
+
+ // Check available space before and after reservation
+ availableBeforeCreation := server.AvailableSpaceFor(option)
+ expectedBefore := int64(5 - i)
+ if availableBeforeCreation != expectedBefore {
+ t.Errorf("Iteration %d: Expected %d base available space, got %d", i, expectedBefore, availableBeforeCreation)
+ }
+
+ // Simulate successful volume creation
+ disk := dn.children[NodeId(types.HardDriveType.String())].(*Disk)
+ deltaDiskUsage := &DiskUsageCounts{
+ volumeCount: 1,
+ }
+ disk.UpAdjustDiskUsageDelta(types.HardDriveType, deltaDiskUsage)
+
+ // Release reservation after successful creation
+ reservation.releaseAllReservations()
+
+ // Verify available space after creation
+ availableAfterCreation := server.AvailableSpaceFor(option)
+ expectedAfter := int64(5 - i - 1)
+ if availableAfterCreation != expectedAfter {
+ t.Errorf("Iteration %d: Expected %d available space after creation, got %d", i, expectedAfter, availableAfterCreation)
+ }
+ }
+
+ // After 5 volumes, should have no more capacity
+ _, _, err := vg.findEmptySlotsForOneVolume(topo, option, true)
+ if err == nil {
+ t.Error("Expected volume allocation to fail when server is at capacity")
+ }
+}
+
+func TestVolumeGrowth_ConcurrentAllocationPreventsRaceCondition(t *testing.T) {
+ // Create test topology with very limited capacity
+ topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
+
+ dc := NewDataCenter("dc1")
+ topo.LinkChildNode(dc)
+ rack := NewRack("rack1")
+ dc.LinkChildNode(rack)
+
+ // Single data node with capacity for only 5 volumes
+ dn := NewDataNode("server1")
+ rack.LinkChildNode(dn)
+
+ disk := NewDisk(types.HardDriveType.String())
+ disk.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 5
+ dn.LinkChildNode(disk)
+
+ vg := NewDefaultVolumeGrowth()
+ rp, _ := super_block.NewReplicaPlacementFromString("000") // Single copy (no replicas)
+
+ option := &VolumeGrowOption{
+ Collection: "test",
+ ReplicaPlacement: rp,
+ DiskType: types.HardDriveType,
+ }
+
+ // Simulate concurrent volume creation attempts
+ const concurrentRequests = 10
+ var wg sync.WaitGroup
+ var successCount, failureCount atomic.Int32
+
+ for i := 0; i < concurrentRequests; i++ {
+ wg.Add(1)
+ go func(requestId int) {
+ defer wg.Done()
+
+ _, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true)
+
+ if err != nil {
+ failureCount.Add(1)
+ t.Logf("Request %d failed as expected: %v", requestId, err)
+ } else {
+ successCount.Add(1)
+ t.Logf("Request %d succeeded, got reservation", requestId)
+
+ // Release the reservation to simulate completion
+ if reservation != nil {
+ reservation.releaseAllReservations()
+ // Simulate volume creation by incrementing count
+ disk := dn.children[NodeId(types.HardDriveType.String())].(*Disk)
+ deltaDiskUsage := &DiskUsageCounts{
+ volumeCount: 1,
+ }
+ disk.UpAdjustDiskUsageDelta(types.HardDriveType, deltaDiskUsage)
+ }
+ }
+ }(i)
+ }
+
+ wg.Wait()
+
+ // With reservation system, only 5 requests should succeed (capacity limit)
+ // The rest should fail due to insufficient capacity
+ if successCount.Load() != 5 {
+ t.Errorf("Expected exactly 5 successful reservations, got %d", successCount.Load())
+ }
+
+ if failureCount.Load() != 5 {
+ t.Errorf("Expected exactly 5 failed reservations, got %d", failureCount.Load())
+ }
+
+ // Verify final state
+ finalAvailable := dn.AvailableSpaceFor(option)
+ if finalAvailable != 0 {
+ t.Errorf("Expected 0 available space after all allocations, got %d", finalAvailable)
+ }
+
+ t.Logf("Concurrent test completed: %d successes, %d failures", successCount.Load(), failureCount.Load())
+}
+
+func TestVolumeGrowth_ReservationFailureRollback(t *testing.T) {
+ // Create topology with multiple servers, but limited total capacity
+ topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
+
+ dc := NewDataCenter("dc1")
+ topo.LinkChildNode(dc)
+ rack := NewRack("rack1")
+ dc.LinkChildNode(rack)
+
+ // Create two servers with different available capacity
+ dn1 := NewDataNode("server1")
+ dn2 := NewDataNode("server2")
+ rack.LinkChildNode(dn1)
+ rack.LinkChildNode(dn2)
+
+ // Server 1: 5 available slots
+ disk1 := NewDisk(types.HardDriveType.String())
+ disk1.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 5
+ dn1.LinkChildNode(disk1)
+
+ // Server 2: 0 available slots (full)
+ disk2 := NewDisk(types.HardDriveType.String())
+ diskUsage2 := disk2.diskUsages.getOrCreateDisk(types.HardDriveType)
+ diskUsage2.maxVolumeCount = 5
+ diskUsage2.volumeCount = 5
+ dn2.LinkChildNode(disk2)
+
+ vg := NewDefaultVolumeGrowth()
+ rp, _ := super_block.NewReplicaPlacementFromString("010") // requires 2 replicas
+
+ option := &VolumeGrowOption{
+ Collection: "test",
+ ReplicaPlacement: rp,
+ DiskType: types.HardDriveType,
+ }
+
+ // This should fail because we can't satisfy replica requirements
+ // (need 2 servers but only 1 has space)
+ _, _, err := vg.findEmptySlotsForOneVolume(topo, option, true)
+ if err == nil {
+ t.Error("Expected reservation to fail due to insufficient replica capacity")
+ }
+
+ // Verify no reservations are left hanging
+ available1 := dn1.AvailableSpaceForReservation(option)
+ if available1 != 5 {
+ t.Errorf("Expected server1 to have all capacity available after failed reservation, got %d", available1)
+ }
+
+ available2 := dn2.AvailableSpaceForReservation(option)
+ if available2 != 0 {
+ t.Errorf("Expected server2 to have no capacity available, got %d", available2)
+ }
+}
+
+func TestVolumeGrowth_ReservationTimeout(t *testing.T) {
+ dn := NewDataNode("server1")
+ diskType := types.HardDriveType
+
+ // Set up capacity
+ diskUsage := dn.diskUsages.getOrCreateDisk(diskType)
+ diskUsage.maxVolumeCount = 5
+
+ // Create a reservation
+ reservationId, success := dn.TryReserveCapacity(diskType, 2)
+ if !success {
+ t.Fatal("Expected successful reservation")
+ }
+
+ // Manually set the reservation time to simulate old reservation
+ dn.capacityReservations.Lock()
+ if reservation, exists := dn.capacityReservations.reservations[reservationId]; exists {
+ reservation.createdAt = time.Now().Add(-10 * time.Minute)
+ }
+ dn.capacityReservations.Unlock()
+
+ // Try another reservation - this should trigger cleanup and succeed
+ _, success = dn.TryReserveCapacity(diskType, 3)
+ if !success {
+ t.Error("Expected reservation to succeed after cleanup of expired reservation")
+ }
+
+ // Original reservation should be cleaned up
+ option := &VolumeGrowOption{DiskType: diskType}
+ available := dn.AvailableSpaceForReservation(option)
+ if available != 2 { // 5 - 3 = 2
+ t.Errorf("Expected 2 available slots after cleanup and new reservation, got %d", available)
+ }
+}
diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go
index 286289148..9bf3f3747 100644
--- a/weed/topology/volume_growth_test.go
+++ b/weed/topology/volume_growth_test.go
@@ -145,7 +145,7 @@ func TestFindEmptySlotsForOneVolume(t *testing.T) {
Rack: "",
DataNode: "",
}
- servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
+ servers, _, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption, false)
if err != nil {
fmt.Println("finding empty slots error :", err)
t.Fail()
@@ -267,7 +267,7 @@ func TestReplication011(t *testing.T) {
Rack: "",
DataNode: "",
}
- servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
+ servers, _, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption, false)
if err != nil {
fmt.Println("finding empty slots error :", err)
t.Fail()
@@ -345,7 +345,7 @@ func TestFindEmptySlotsForOneVolumeScheduleByWeight(t *testing.T) {
distribution := map[NodeId]int{}
// assign 1000 volumes
for i := 0; i < 1000; i++ {
- servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
+ servers, _, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption, false)
if err != nil {
fmt.Println("finding empty slots error :", err)
t.Fail()