diff options
Diffstat (limited to 'seaweedfs-rdma-sidecar/rdma-engine/src/memory.rs')
| -rw-r--r-- | seaweedfs-rdma-sidecar/rdma-engine/src/memory.rs | 630 |
1 files changed, 630 insertions, 0 deletions
diff --git a/seaweedfs-rdma-sidecar/rdma-engine/src/memory.rs b/seaweedfs-rdma-sidecar/rdma-engine/src/memory.rs new file mode 100644 index 000000000..17a9a5b1d --- /dev/null +++ b/seaweedfs-rdma-sidecar/rdma-engine/src/memory.rs @@ -0,0 +1,630 @@ +//! Memory management for RDMA operations +//! +//! This module provides efficient memory allocation, registration, and management +//! for RDMA operations with zero-copy semantics and proper cleanup. + +use crate::{RdmaError, RdmaResult}; +use memmap2::MmapMut; +use parking_lot::RwLock; +use std::collections::HashMap; +use std::sync::Arc; +use tracing::{debug, info, warn}; + +/// Memory pool for efficient buffer allocation +pub struct MemoryPool { + /// Pre-allocated memory regions by size + pools: RwLock<HashMap<usize, Vec<PooledBuffer>>>, + /// Total allocated memory in bytes + total_allocated: RwLock<usize>, + /// Maximum pool size per buffer size + max_pool_size: usize, + /// Maximum total memory usage + max_total_memory: usize, + /// Statistics + stats: RwLock<MemoryPoolStats>, +} + +/// Statistics for memory pool +#[derive(Debug, Clone, Default)] +pub struct MemoryPoolStats { + /// Total allocations requested + pub total_allocations: u64, + /// Total deallocations + pub total_deallocations: u64, + /// Cache hits (reused buffers) + pub cache_hits: u64, + /// Cache misses (new allocations) + pub cache_misses: u64, + /// Current active allocations + pub active_allocations: usize, + /// Peak memory usage in bytes + pub peak_memory_usage: usize, +} + +/// A pooled memory buffer +pub struct PooledBuffer { + /// Raw buffer data + data: Vec<u8>, + /// Size of the buffer + size: usize, + /// Whether the buffer is currently in use + in_use: bool, + /// Creation timestamp + created_at: std::time::Instant, +} + +impl PooledBuffer { + /// Create new pooled buffer + fn new(size: usize) -> Self { + Self { + data: vec![0u8; size], + size, + in_use: false, + created_at: std::time::Instant::now(), + } + } + + /// Get buffer data as slice + pub fn as_slice(&self) -> &[u8] { + &self.data + } + + /// Get buffer data as mutable slice + pub fn as_mut_slice(&mut self) -> &mut [u8] { + &mut self.data + } + + /// Get buffer size + pub fn size(&self) -> usize { + self.size + } + + /// Get buffer age + pub fn age(&self) -> std::time::Duration { + self.created_at.elapsed() + } + + /// Get raw pointer to buffer data + pub fn as_ptr(&self) -> *const u8 { + self.data.as_ptr() + } + + /// Get mutable raw pointer to buffer data + pub fn as_mut_ptr(&mut self) -> *mut u8 { + self.data.as_mut_ptr() + } +} + +impl MemoryPool { + /// Create new memory pool + pub fn new(max_pool_size: usize, max_total_memory: usize) -> Self { + info!("π§ Memory pool initialized: max_pool_size={}, max_total_memory={} bytes", + max_pool_size, max_total_memory); + + Self { + pools: RwLock::new(HashMap::new()), + total_allocated: RwLock::new(0), + max_pool_size, + max_total_memory, + stats: RwLock::new(MemoryPoolStats::default()), + } + } + + /// Allocate buffer from pool + pub fn allocate(&self, size: usize) -> RdmaResult<Arc<RwLock<PooledBuffer>>> { + // Round up to next power of 2 for better pooling + let pool_size = size.next_power_of_two(); + + { + let mut stats = self.stats.write(); + stats.total_allocations += 1; + } + + // Try to get buffer from pool first + { + let mut pools = self.pools.write(); + if let Some(pool) = pools.get_mut(&pool_size) { + // Find available buffer in pool + for buffer in pool.iter_mut() { + if !buffer.in_use { + buffer.in_use = true; + + let mut stats = self.stats.write(); + stats.cache_hits += 1; + stats.active_allocations += 1; + + debug!("π¦ Reused buffer from pool: size={}", pool_size); + return Ok(Arc::new(RwLock::new(std::mem::replace( + buffer, + PooledBuffer::new(0) // Placeholder + )))); + } + } + } + } + + // No available buffer in pool, create new one + let total_allocated = *self.total_allocated.read(); + if total_allocated + pool_size > self.max_total_memory { + return Err(RdmaError::ResourceExhausted { + resource: "memory".to_string() + }); + } + + let mut buffer = PooledBuffer::new(pool_size); + buffer.in_use = true; + + // Update allocation tracking + let new_total = { + let mut total = self.total_allocated.write(); + *total += pool_size; + *total + }; + + { + let mut stats = self.stats.write(); + stats.cache_misses += 1; + stats.active_allocations += 1; + if new_total > stats.peak_memory_usage { + stats.peak_memory_usage = new_total; + } + } + + debug!("π Allocated new buffer: size={}, total_allocated={}", + pool_size, new_total); + + Ok(Arc::new(RwLock::new(buffer))) + } + + /// Return buffer to pool + pub fn deallocate(&self, buffer: Arc<RwLock<PooledBuffer>>) -> RdmaResult<()> { + let buffer_size = { + let buf = buffer.read(); + buf.size() + }; + + { + let mut stats = self.stats.write(); + stats.total_deallocations += 1; + stats.active_allocations = stats.active_allocations.saturating_sub(1); + } + + // Try to return buffer to pool + { + let mut pools = self.pools.write(); + let pool = pools.entry(buffer_size).or_insert_with(Vec::new); + + if pool.len() < self.max_pool_size { + // Reset buffer state and return to pool + if let Ok(buf) = Arc::try_unwrap(buffer) { + let mut buf = buf.into_inner(); + buf.in_use = false; + buf.data.fill(0); // Clear data for security + pool.push(buf); + + debug!("β»οΈ Returned buffer to pool: size={}", buffer_size); + return Ok(()); + } + } + } + + // Pool is full or buffer is still referenced, just track deallocation + { + let mut total = self.total_allocated.write(); + *total = total.saturating_sub(buffer_size); + } + + debug!("ποΈ Buffer deallocated (not pooled): size={}", buffer_size); + Ok(()) + } + + /// Get memory pool statistics + pub fn stats(&self) -> MemoryPoolStats { + self.stats.read().clone() + } + + /// Get current memory usage + pub fn current_usage(&self) -> usize { + *self.total_allocated.read() + } + + /// Clean up old unused buffers from pools + pub fn cleanup_old_buffers(&self, max_age: std::time::Duration) { + let mut cleaned_count = 0; + let mut cleaned_bytes = 0; + + { + let mut pools = self.pools.write(); + for (size, pool) in pools.iter_mut() { + pool.retain(|buffer| { + if buffer.age() > max_age && !buffer.in_use { + cleaned_count += 1; + cleaned_bytes += size; + false + } else { + true + } + }); + } + } + + if cleaned_count > 0 { + { + let mut total = self.total_allocated.write(); + *total = total.saturating_sub(cleaned_bytes); + } + + info!("π§Ή Cleaned up {} old buffers, freed {} bytes", + cleaned_count, cleaned_bytes); + } + } +} + +/// RDMA-specific memory manager +pub struct RdmaMemoryManager { + /// General purpose memory pool + pool: MemoryPool, + /// Memory-mapped regions for large allocations + mmapped_regions: RwLock<HashMap<u64, MmapRegion>>, + /// HugePage allocations (if available) + hugepage_regions: RwLock<HashMap<u64, HugePageRegion>>, + /// Configuration + config: MemoryConfig, +} + +/// Memory configuration +#[derive(Debug, Clone)] +pub struct MemoryConfig { + /// Use hugepages for large allocations + pub use_hugepages: bool, + /// Hugepage size in bytes + pub hugepage_size: usize, + /// Memory pool settings + pub pool_max_size: usize, + /// Maximum total memory usage + pub max_total_memory: usize, + /// Buffer cleanup interval + pub cleanup_interval_secs: u64, +} + +impl Default for MemoryConfig { + fn default() -> Self { + Self { + use_hugepages: true, + hugepage_size: 2 * 1024 * 1024, // 2MB + pool_max_size: 1000, + max_total_memory: 8 * 1024 * 1024 * 1024, // 8GB + cleanup_interval_secs: 300, // 5 minutes + } + } +} + +/// Memory-mapped region +#[allow(dead_code)] +struct MmapRegion { + mmap: MmapMut, + size: usize, + created_at: std::time::Instant, +} + +/// HugePage memory region +#[allow(dead_code)] +struct HugePageRegion { + addr: *mut u8, + size: usize, + created_at: std::time::Instant, +} + +unsafe impl Send for HugePageRegion {} +unsafe impl Sync for HugePageRegion {} + +impl RdmaMemoryManager { + /// Create new RDMA memory manager + pub fn new(config: MemoryConfig) -> Self { + let pool = MemoryPool::new(config.pool_max_size, config.max_total_memory); + + Self { + pool, + mmapped_regions: RwLock::new(HashMap::new()), + hugepage_regions: RwLock::new(HashMap::new()), + config, + } + } + + /// Allocate memory optimized for RDMA operations + pub fn allocate_rdma_buffer(&self, size: usize) -> RdmaResult<RdmaBuffer> { + if size >= self.config.hugepage_size && self.config.use_hugepages { + self.allocate_hugepage_buffer(size) + } else if size >= 64 * 1024 { // Use mmap for large buffers + self.allocate_mmap_buffer(size) + } else { + self.allocate_pool_buffer(size) + } + } + + /// Allocate buffer from memory pool + fn allocate_pool_buffer(&self, size: usize) -> RdmaResult<RdmaBuffer> { + let buffer = self.pool.allocate(size)?; + Ok(RdmaBuffer::Pool { buffer, size }) + } + + /// Allocate memory-mapped buffer + fn allocate_mmap_buffer(&self, size: usize) -> RdmaResult<RdmaBuffer> { + let mmap = MmapMut::map_anon(size) + .map_err(|e| RdmaError::memory_reg_failed(format!("mmap failed: {}", e)))?; + + let addr = mmap.as_ptr() as u64; + let region = MmapRegion { + mmap, + size, + created_at: std::time::Instant::now(), + }; + + { + let mut regions = self.mmapped_regions.write(); + regions.insert(addr, region); + } + + debug!("πΊοΈ Allocated mmap buffer: addr=0x{:x}, size={}", addr, size); + Ok(RdmaBuffer::Mmap { addr, size }) + } + + /// Allocate hugepage buffer (Linux-specific) + fn allocate_hugepage_buffer(&self, size: usize) -> RdmaResult<RdmaBuffer> { + #[cfg(target_os = "linux")] + { + use nix::sys::mman::{mmap, MapFlags, ProtFlags}; + + // Round up to hugepage boundary + let aligned_size = (size + self.config.hugepage_size - 1) & !(self.config.hugepage_size - 1); + + let addr = unsafe { + // For anonymous mapping, we can use -1 as the file descriptor + use std::os::fd::BorrowedFd; + let fake_fd = BorrowedFd::borrow_raw(-1); // Anonymous mapping uses -1 + + mmap( + None, // ptr::null_mut() -> None + std::num::NonZero::new(aligned_size).unwrap(), // aligned_size -> NonZero<usize> + ProtFlags::PROT_READ | ProtFlags::PROT_WRITE, + MapFlags::MAP_PRIVATE | MapFlags::MAP_ANONYMOUS | MapFlags::MAP_HUGETLB, + Some(&fake_fd), // Use borrowed FD for -1 wrapped in Some + 0, + ) + }; + + match addr { + Ok(addr) => { + let addr_u64 = addr as u64; + let region = HugePageRegion { + addr: addr as *mut u8, + size: aligned_size, + created_at: std::time::Instant::now(), + }; + + { + let mut regions = self.hugepage_regions.write(); + regions.insert(addr_u64, region); + } + + info!("π₯ Allocated hugepage buffer: addr=0x{:x}, size={}", addr_u64, aligned_size); + Ok(RdmaBuffer::HugePage { addr: addr_u64, size: aligned_size }) + } + Err(_) => { + warn!("Failed to allocate hugepage buffer, falling back to mmap"); + self.allocate_mmap_buffer(size) + } + } + } + + #[cfg(not(target_os = "linux"))] + { + warn!("HugePages not supported on this platform, using mmap"); + self.allocate_mmap_buffer(size) + } + } + + /// Deallocate RDMA buffer + pub fn deallocate_buffer(&self, buffer: RdmaBuffer) -> RdmaResult<()> { + match buffer { + RdmaBuffer::Pool { buffer, .. } => { + self.pool.deallocate(buffer) + } + RdmaBuffer::Mmap { addr, .. } => { + let mut regions = self.mmapped_regions.write(); + regions.remove(&addr); + debug!("ποΈ Deallocated mmap buffer: addr=0x{:x}", addr); + Ok(()) + } + RdmaBuffer::HugePage { addr, size } => { + { + let mut regions = self.hugepage_regions.write(); + regions.remove(&addr); + } + + #[cfg(target_os = "linux")] + { + use nix::sys::mman::munmap; + unsafe { + let _ = munmap(addr as *mut std::ffi::c_void, size); + } + } + + debug!("ποΈ Deallocated hugepage buffer: addr=0x{:x}, size={}", addr, size); + Ok(()) + } + } + } + + /// Get memory manager statistics + pub fn stats(&self) -> MemoryManagerStats { + let pool_stats = self.pool.stats(); + let mmap_count = self.mmapped_regions.read().len(); + let hugepage_count = self.hugepage_regions.read().len(); + + MemoryManagerStats { + pool_stats, + mmap_regions: mmap_count, + hugepage_regions: hugepage_count, + total_memory_usage: self.pool.current_usage(), + } + } + + /// Start background cleanup task + pub async fn start_cleanup_task(&self) -> tokio::task::JoinHandle<()> { + let pool = MemoryPool::new(self.config.pool_max_size, self.config.max_total_memory); + let cleanup_interval = std::time::Duration::from_secs(self.config.cleanup_interval_secs); + + tokio::spawn(async move { + let mut interval = tokio::time::interval( + tokio::time::Duration::from_secs(300) // 5 minutes + ); + + loop { + interval.tick().await; + pool.cleanup_old_buffers(cleanup_interval); + } + }) + } +} + +/// RDMA buffer types +pub enum RdmaBuffer { + /// Buffer from memory pool + Pool { + buffer: Arc<RwLock<PooledBuffer>>, + size: usize, + }, + /// Memory-mapped buffer + Mmap { + addr: u64, + size: usize, + }, + /// HugePage buffer + HugePage { + addr: u64, + size: usize, + }, +} + +impl RdmaBuffer { + /// Get buffer address + pub fn addr(&self) -> u64 { + match self { + Self::Pool { buffer, .. } => { + buffer.read().as_ptr() as u64 + } + Self::Mmap { addr, .. } => *addr, + Self::HugePage { addr, .. } => *addr, + } + } + + /// Get buffer size + pub fn size(&self) -> usize { + match self { + Self::Pool { size, .. } => *size, + Self::Mmap { size, .. } => *size, + Self::HugePage { size, .. } => *size, + } + } + + /// Get buffer as Vec (copy to avoid lifetime issues) + pub fn to_vec(&self) -> Vec<u8> { + match self { + Self::Pool { buffer, .. } => { + buffer.read().as_slice().to_vec() + } + Self::Mmap { addr, size } => { + unsafe { + let slice = std::slice::from_raw_parts(*addr as *const u8, *size); + slice.to_vec() + } + } + Self::HugePage { addr, size } => { + unsafe { + let slice = std::slice::from_raw_parts(*addr as *const u8, *size); + slice.to_vec() + } + } + } + } + + /// Get buffer type name + pub fn buffer_type(&self) -> &'static str { + match self { + Self::Pool { .. } => "pool", + Self::Mmap { .. } => "mmap", + Self::HugePage { .. } => "hugepage", + } + } +} + +/// Memory manager statistics +#[derive(Debug, Clone)] +pub struct MemoryManagerStats { + /// Pool statistics + pub pool_stats: MemoryPoolStats, + /// Number of mmap regions + pub mmap_regions: usize, + /// Number of hugepage regions + pub hugepage_regions: usize, + /// Total memory usage in bytes + pub total_memory_usage: usize, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_memory_pool_allocation() { + let pool = MemoryPool::new(10, 1024 * 1024); + + let buffer1 = pool.allocate(4096).unwrap(); + let buffer2 = pool.allocate(4096).unwrap(); + + assert_eq!(buffer1.read().size(), 4096); + assert_eq!(buffer2.read().size(), 4096); + + let stats = pool.stats(); + assert_eq!(stats.total_allocations, 2); + assert_eq!(stats.cache_misses, 2); + } + + #[test] + fn test_memory_pool_reuse() { + let pool = MemoryPool::new(10, 1024 * 1024); + + // Allocate and deallocate + let buffer = pool.allocate(4096).unwrap(); + let size = buffer.read().size(); + pool.deallocate(buffer).unwrap(); + + // Allocate again - should reuse + let buffer2 = pool.allocate(4096).unwrap(); + assert_eq!(buffer2.read().size(), size); + + let stats = pool.stats(); + assert_eq!(stats.cache_hits, 1); + } + + #[tokio::test] + async fn test_rdma_memory_manager() { + let config = MemoryConfig::default(); + let manager = RdmaMemoryManager::new(config); + + // Test small buffer (pool) + let small_buffer = manager.allocate_rdma_buffer(1024).unwrap(); + assert_eq!(small_buffer.size(), 1024); + assert_eq!(small_buffer.buffer_type(), "pool"); + + // Test large buffer (mmap) + let large_buffer = manager.allocate_rdma_buffer(128 * 1024).unwrap(); + assert_eq!(large_buffer.size(), 128 * 1024); + assert_eq!(large_buffer.buffer_type(), "mmap"); + + // Clean up + manager.deallocate_buffer(small_buffer).unwrap(); + manager.deallocate_buffer(large_buffer).unwrap(); + } +} |
