aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/volume_growth_reservation_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology/volume_growth_reservation_test.go')
-rw-r--r--weed/topology/volume_growth_reservation_test.go276
1 files changed, 276 insertions, 0 deletions
diff --git a/weed/topology/volume_growth_reservation_test.go b/weed/topology/volume_growth_reservation_test.go
new file mode 100644
index 000000000..7b06e626d
--- /dev/null
+++ b/weed/topology/volume_growth_reservation_test.go
@@ -0,0 +1,276 @@
+package topology
+
+import (
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/sequence"
+ "github.com/seaweedfs/seaweedfs/weed/storage/needle"
+ "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
+ "github.com/seaweedfs/seaweedfs/weed/storage/types"
+)
+
+// MockGrpcDialOption simulates grpc connection for testing
+type MockGrpcDialOption struct{}
+
+// simulateVolumeAllocation mocks the volume allocation process
+func simulateVolumeAllocation(server *DataNode, vid needle.VolumeId, option *VolumeGrowOption) error {
+ // Simulate some processing time
+ time.Sleep(time.Millisecond * 10)
+ return nil
+}
+
+func TestVolumeGrowth_ReservationBasedAllocation(t *testing.T) {
+ // Create test topology with single server for predictable behavior
+ topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
+
+ // Create data center and rack
+ dc := NewDataCenter("dc1")
+ topo.LinkChildNode(dc)
+ rack := NewRack("rack1")
+ dc.LinkChildNode(rack)
+
+ // Create single data node with limited capacity
+ dn := NewDataNode("server1")
+ rack.LinkChildNode(dn)
+
+ // Set up disk with limited capacity (only 5 volumes)
+ disk := NewDisk(types.HardDriveType.String())
+ disk.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 5
+ dn.LinkChildNode(disk)
+
+ // Test volume growth with reservation
+ vg := NewDefaultVolumeGrowth()
+ rp, _ := super_block.NewReplicaPlacementFromString("000") // Single copy (no replicas)
+
+ option := &VolumeGrowOption{
+ Collection: "test",
+ ReplicaPlacement: rp,
+ DiskType: types.HardDriveType,
+ }
+
+ // Try to create volumes and verify reservations work
+ for i := 0; i < 5; i++ {
+ servers, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true)
+ if err != nil {
+ t.Errorf("Failed to find slots with reservation on iteration %d: %v", i, err)
+ continue
+ }
+
+ if len(servers) != 1 {
+ t.Errorf("Expected 1 server for replica placement 000, got %d", len(servers))
+ }
+
+ if len(reservation.reservationIds) != 1 {
+ t.Errorf("Expected 1 reservation ID, got %d", len(reservation.reservationIds))
+ }
+
+ // Verify the reservation is on our expected server
+ server := servers[0]
+ if server != dn {
+ t.Errorf("Expected volume to be allocated on server1, got %s", server.Id())
+ }
+
+ // Check available space before and after reservation
+ availableBeforeCreation := server.AvailableSpaceFor(option)
+ expectedBefore := int64(5 - i)
+ if availableBeforeCreation != expectedBefore {
+ t.Errorf("Iteration %d: Expected %d base available space, got %d", i, expectedBefore, availableBeforeCreation)
+ }
+
+ // Simulate successful volume creation
+ disk := dn.children[NodeId(types.HardDriveType.String())].(*Disk)
+ deltaDiskUsage := &DiskUsageCounts{
+ volumeCount: 1,
+ }
+ disk.UpAdjustDiskUsageDelta(types.HardDriveType, deltaDiskUsage)
+
+ // Release reservation after successful creation
+ reservation.releaseAllReservations()
+
+ // Verify available space after creation
+ availableAfterCreation := server.AvailableSpaceFor(option)
+ expectedAfter := int64(5 - i - 1)
+ if availableAfterCreation != expectedAfter {
+ t.Errorf("Iteration %d: Expected %d available space after creation, got %d", i, expectedAfter, availableAfterCreation)
+ }
+ }
+
+ // After 5 volumes, should have no more capacity
+ _, _, err := vg.findEmptySlotsForOneVolume(topo, option, true)
+ if err == nil {
+ t.Error("Expected volume allocation to fail when server is at capacity")
+ }
+}
+
+func TestVolumeGrowth_ConcurrentAllocationPreventsRaceCondition(t *testing.T) {
+ // Create test topology with very limited capacity
+ topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
+
+ dc := NewDataCenter("dc1")
+ topo.LinkChildNode(dc)
+ rack := NewRack("rack1")
+ dc.LinkChildNode(rack)
+
+ // Single data node with capacity for only 5 volumes
+ dn := NewDataNode("server1")
+ rack.LinkChildNode(dn)
+
+ disk := NewDisk(types.HardDriveType.String())
+ disk.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 5
+ dn.LinkChildNode(disk)
+
+ vg := NewDefaultVolumeGrowth()
+ rp, _ := super_block.NewReplicaPlacementFromString("000") // Single copy (no replicas)
+
+ option := &VolumeGrowOption{
+ Collection: "test",
+ ReplicaPlacement: rp,
+ DiskType: types.HardDriveType,
+ }
+
+ // Simulate concurrent volume creation attempts
+ const concurrentRequests = 10
+ var wg sync.WaitGroup
+ var successCount, failureCount atomic.Int32
+
+ for i := 0; i < concurrentRequests; i++ {
+ wg.Add(1)
+ go func(requestId int) {
+ defer wg.Done()
+
+ _, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true)
+
+ if err != nil {
+ failureCount.Add(1)
+ t.Logf("Request %d failed as expected: %v", requestId, err)
+ } else {
+ successCount.Add(1)
+ t.Logf("Request %d succeeded, got reservation", requestId)
+
+ // Release the reservation to simulate completion
+ if reservation != nil {
+ reservation.releaseAllReservations()
+ // Simulate volume creation by incrementing count
+ disk := dn.children[NodeId(types.HardDriveType.String())].(*Disk)
+ deltaDiskUsage := &DiskUsageCounts{
+ volumeCount: 1,
+ }
+ disk.UpAdjustDiskUsageDelta(types.HardDriveType, deltaDiskUsage)
+ }
+ }
+ }(i)
+ }
+
+ wg.Wait()
+
+ // With reservation system, only 5 requests should succeed (capacity limit)
+ // The rest should fail due to insufficient capacity
+ if successCount.Load() != 5 {
+ t.Errorf("Expected exactly 5 successful reservations, got %d", successCount.Load())
+ }
+
+ if failureCount.Load() != 5 {
+ t.Errorf("Expected exactly 5 failed reservations, got %d", failureCount.Load())
+ }
+
+ // Verify final state
+ finalAvailable := dn.AvailableSpaceFor(option)
+ if finalAvailable != 0 {
+ t.Errorf("Expected 0 available space after all allocations, got %d", finalAvailable)
+ }
+
+ t.Logf("Concurrent test completed: %d successes, %d failures", successCount.Load(), failureCount.Load())
+}
+
+func TestVolumeGrowth_ReservationFailureRollback(t *testing.T) {
+ // Create topology with multiple servers, but limited total capacity
+ topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
+
+ dc := NewDataCenter("dc1")
+ topo.LinkChildNode(dc)
+ rack := NewRack("rack1")
+ dc.LinkChildNode(rack)
+
+ // Create two servers with different available capacity
+ dn1 := NewDataNode("server1")
+ dn2 := NewDataNode("server2")
+ rack.LinkChildNode(dn1)
+ rack.LinkChildNode(dn2)
+
+ // Server 1: 5 available slots
+ disk1 := NewDisk(types.HardDriveType.String())
+ disk1.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 5
+ dn1.LinkChildNode(disk1)
+
+ // Server 2: 0 available slots (full)
+ disk2 := NewDisk(types.HardDriveType.String())
+ diskUsage2 := disk2.diskUsages.getOrCreateDisk(types.HardDriveType)
+ diskUsage2.maxVolumeCount = 5
+ diskUsage2.volumeCount = 5
+ dn2.LinkChildNode(disk2)
+
+ vg := NewDefaultVolumeGrowth()
+ rp, _ := super_block.NewReplicaPlacementFromString("010") // requires 2 replicas
+
+ option := &VolumeGrowOption{
+ Collection: "test",
+ ReplicaPlacement: rp,
+ DiskType: types.HardDriveType,
+ }
+
+ // This should fail because we can't satisfy replica requirements
+ // (need 2 servers but only 1 has space)
+ _, _, err := vg.findEmptySlotsForOneVolume(topo, option, true)
+ if err == nil {
+ t.Error("Expected reservation to fail due to insufficient replica capacity")
+ }
+
+ // Verify no reservations are left hanging
+ available1 := dn1.AvailableSpaceForReservation(option)
+ if available1 != 5 {
+ t.Errorf("Expected server1 to have all capacity available after failed reservation, got %d", available1)
+ }
+
+ available2 := dn2.AvailableSpaceForReservation(option)
+ if available2 != 0 {
+ t.Errorf("Expected server2 to have no capacity available, got %d", available2)
+ }
+}
+
+func TestVolumeGrowth_ReservationTimeout(t *testing.T) {
+ dn := NewDataNode("server1")
+ diskType := types.HardDriveType
+
+ // Set up capacity
+ diskUsage := dn.diskUsages.getOrCreateDisk(diskType)
+ diskUsage.maxVolumeCount = 5
+
+ // Create a reservation
+ reservationId, success := dn.TryReserveCapacity(diskType, 2)
+ if !success {
+ t.Fatal("Expected successful reservation")
+ }
+
+ // Manually set the reservation time to simulate old reservation
+ dn.capacityReservations.Lock()
+ if reservation, exists := dn.capacityReservations.reservations[reservationId]; exists {
+ reservation.createdAt = time.Now().Add(-10 * time.Minute)
+ }
+ dn.capacityReservations.Unlock()
+
+ // Try another reservation - this should trigger cleanup and succeed
+ _, success = dn.TryReserveCapacity(diskType, 3)
+ if !success {
+ t.Error("Expected reservation to succeed after cleanup of expired reservation")
+ }
+
+ // Original reservation should be cleaned up
+ option := &VolumeGrowOption{DiskType: diskType}
+ available := dn.AvailableSpaceForReservation(option)
+ if available != 2 { // 5 - 3 = 2
+ t.Errorf("Expected 2 available slots after cleanup and new reservation, got %d", available)
+ }
+}