diff options
Diffstat (limited to 'weed/topology/volume_growth_reservation_test.go')
| -rw-r--r-- | weed/topology/volume_growth_reservation_test.go | 276 |
1 files changed, 276 insertions, 0 deletions
diff --git a/weed/topology/volume_growth_reservation_test.go b/weed/topology/volume_growth_reservation_test.go new file mode 100644 index 000000000..7b06e626d --- /dev/null +++ b/weed/topology/volume_growth_reservation_test.go @@ -0,0 +1,276 @@ +package topology + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/sequence" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + "github.com/seaweedfs/seaweedfs/weed/storage/types" +) + +// MockGrpcDialOption simulates grpc connection for testing +type MockGrpcDialOption struct{} + +// simulateVolumeAllocation mocks the volume allocation process +func simulateVolumeAllocation(server *DataNode, vid needle.VolumeId, option *VolumeGrowOption) error { + // Simulate some processing time + time.Sleep(time.Millisecond * 10) + return nil +} + +func TestVolumeGrowth_ReservationBasedAllocation(t *testing.T) { + // Create test topology with single server for predictable behavior + topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false) + + // Create data center and rack + dc := NewDataCenter("dc1") + topo.LinkChildNode(dc) + rack := NewRack("rack1") + dc.LinkChildNode(rack) + + // Create single data node with limited capacity + dn := NewDataNode("server1") + rack.LinkChildNode(dn) + + // Set up disk with limited capacity (only 5 volumes) + disk := NewDisk(types.HardDriveType.String()) + disk.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 5 + dn.LinkChildNode(disk) + + // Test volume growth with reservation + vg := NewDefaultVolumeGrowth() + rp, _ := super_block.NewReplicaPlacementFromString("000") // Single copy (no replicas) + + option := &VolumeGrowOption{ + Collection: "test", + ReplicaPlacement: rp, + DiskType: types.HardDriveType, + } + + // Try to create volumes and verify reservations work + for i := 0; i < 5; i++ { + servers, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true) + if err != nil { + t.Errorf("Failed to find slots with reservation on iteration %d: %v", i, err) + continue + } + + if len(servers) != 1 { + t.Errorf("Expected 1 server for replica placement 000, got %d", len(servers)) + } + + if len(reservation.reservationIds) != 1 { + t.Errorf("Expected 1 reservation ID, got %d", len(reservation.reservationIds)) + } + + // Verify the reservation is on our expected server + server := servers[0] + if server != dn { + t.Errorf("Expected volume to be allocated on server1, got %s", server.Id()) + } + + // Check available space before and after reservation + availableBeforeCreation := server.AvailableSpaceFor(option) + expectedBefore := int64(5 - i) + if availableBeforeCreation != expectedBefore { + t.Errorf("Iteration %d: Expected %d base available space, got %d", i, expectedBefore, availableBeforeCreation) + } + + // Simulate successful volume creation + disk := dn.children[NodeId(types.HardDriveType.String())].(*Disk) + deltaDiskUsage := &DiskUsageCounts{ + volumeCount: 1, + } + disk.UpAdjustDiskUsageDelta(types.HardDriveType, deltaDiskUsage) + + // Release reservation after successful creation + reservation.releaseAllReservations() + + // Verify available space after creation + availableAfterCreation := server.AvailableSpaceFor(option) + expectedAfter := int64(5 - i - 1) + if availableAfterCreation != expectedAfter { + t.Errorf("Iteration %d: Expected %d available space after creation, got %d", i, expectedAfter, availableAfterCreation) + } + } + + // After 5 volumes, should have no more capacity + _, _, err := vg.findEmptySlotsForOneVolume(topo, option, true) + if err == nil { + t.Error("Expected volume allocation to fail when server is at capacity") + } +} + +func TestVolumeGrowth_ConcurrentAllocationPreventsRaceCondition(t *testing.T) { + // Create test topology with very limited capacity + topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false) + + dc := NewDataCenter("dc1") + topo.LinkChildNode(dc) + rack := NewRack("rack1") + dc.LinkChildNode(rack) + + // Single data node with capacity for only 5 volumes + dn := NewDataNode("server1") + rack.LinkChildNode(dn) + + disk := NewDisk(types.HardDriveType.String()) + disk.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 5 + dn.LinkChildNode(disk) + + vg := NewDefaultVolumeGrowth() + rp, _ := super_block.NewReplicaPlacementFromString("000") // Single copy (no replicas) + + option := &VolumeGrowOption{ + Collection: "test", + ReplicaPlacement: rp, + DiskType: types.HardDriveType, + } + + // Simulate concurrent volume creation attempts + const concurrentRequests = 10 + var wg sync.WaitGroup + var successCount, failureCount atomic.Int32 + + for i := 0; i < concurrentRequests; i++ { + wg.Add(1) + go func(requestId int) { + defer wg.Done() + + _, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true) + + if err != nil { + failureCount.Add(1) + t.Logf("Request %d failed as expected: %v", requestId, err) + } else { + successCount.Add(1) + t.Logf("Request %d succeeded, got reservation", requestId) + + // Release the reservation to simulate completion + if reservation != nil { + reservation.releaseAllReservations() + // Simulate volume creation by incrementing count + disk := dn.children[NodeId(types.HardDriveType.String())].(*Disk) + deltaDiskUsage := &DiskUsageCounts{ + volumeCount: 1, + } + disk.UpAdjustDiskUsageDelta(types.HardDriveType, deltaDiskUsage) + } + } + }(i) + } + + wg.Wait() + + // With reservation system, only 5 requests should succeed (capacity limit) + // The rest should fail due to insufficient capacity + if successCount.Load() != 5 { + t.Errorf("Expected exactly 5 successful reservations, got %d", successCount.Load()) + } + + if failureCount.Load() != 5 { + t.Errorf("Expected exactly 5 failed reservations, got %d", failureCount.Load()) + } + + // Verify final state + finalAvailable := dn.AvailableSpaceFor(option) + if finalAvailable != 0 { + t.Errorf("Expected 0 available space after all allocations, got %d", finalAvailable) + } + + t.Logf("Concurrent test completed: %d successes, %d failures", successCount.Load(), failureCount.Load()) +} + +func TestVolumeGrowth_ReservationFailureRollback(t *testing.T) { + // Create topology with multiple servers, but limited total capacity + topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false) + + dc := NewDataCenter("dc1") + topo.LinkChildNode(dc) + rack := NewRack("rack1") + dc.LinkChildNode(rack) + + // Create two servers with different available capacity + dn1 := NewDataNode("server1") + dn2 := NewDataNode("server2") + rack.LinkChildNode(dn1) + rack.LinkChildNode(dn2) + + // Server 1: 5 available slots + disk1 := NewDisk(types.HardDriveType.String()) + disk1.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 5 + dn1.LinkChildNode(disk1) + + // Server 2: 0 available slots (full) + disk2 := NewDisk(types.HardDriveType.String()) + diskUsage2 := disk2.diskUsages.getOrCreateDisk(types.HardDriveType) + diskUsage2.maxVolumeCount = 5 + diskUsage2.volumeCount = 5 + dn2.LinkChildNode(disk2) + + vg := NewDefaultVolumeGrowth() + rp, _ := super_block.NewReplicaPlacementFromString("010") // requires 2 replicas + + option := &VolumeGrowOption{ + Collection: "test", + ReplicaPlacement: rp, + DiskType: types.HardDriveType, + } + + // This should fail because we can't satisfy replica requirements + // (need 2 servers but only 1 has space) + _, _, err := vg.findEmptySlotsForOneVolume(topo, option, true) + if err == nil { + t.Error("Expected reservation to fail due to insufficient replica capacity") + } + + // Verify no reservations are left hanging + available1 := dn1.AvailableSpaceForReservation(option) + if available1 != 5 { + t.Errorf("Expected server1 to have all capacity available after failed reservation, got %d", available1) + } + + available2 := dn2.AvailableSpaceForReservation(option) + if available2 != 0 { + t.Errorf("Expected server2 to have no capacity available, got %d", available2) + } +} + +func TestVolumeGrowth_ReservationTimeout(t *testing.T) { + dn := NewDataNode("server1") + diskType := types.HardDriveType + + // Set up capacity + diskUsage := dn.diskUsages.getOrCreateDisk(diskType) + diskUsage.maxVolumeCount = 5 + + // Create a reservation + reservationId, success := dn.TryReserveCapacity(diskType, 2) + if !success { + t.Fatal("Expected successful reservation") + } + + // Manually set the reservation time to simulate old reservation + dn.capacityReservations.Lock() + if reservation, exists := dn.capacityReservations.reservations[reservationId]; exists { + reservation.createdAt = time.Now().Add(-10 * time.Minute) + } + dn.capacityReservations.Unlock() + + // Try another reservation - this should trigger cleanup and succeed + _, success = dn.TryReserveCapacity(diskType, 3) + if !success { + t.Error("Expected reservation to succeed after cleanup of expired reservation") + } + + // Original reservation should be cleaned up + option := &VolumeGrowOption{DiskType: diskType} + available := dn.AvailableSpaceForReservation(option) + if available != 2 { // 5 - 3 = 2 + t.Errorf("Expected 2 available slots after cleanup and new reservation, got %d", available) + } +} |
