aboutsummaryrefslogtreecommitdiff
path: root/seaweedfs-rdma-sidecar/rdma-engine/src/session.rs
diff options
context:
space:
mode:
Diffstat (limited to 'seaweedfs-rdma-sidecar/rdma-engine/src/session.rs')
-rw-r--r--seaweedfs-rdma-sidecar/rdma-engine/src/session.rs587
1 files changed, 587 insertions, 0 deletions
diff --git a/seaweedfs-rdma-sidecar/rdma-engine/src/session.rs b/seaweedfs-rdma-sidecar/rdma-engine/src/session.rs
new file mode 100644
index 000000000..fa089c72a
--- /dev/null
+++ b/seaweedfs-rdma-sidecar/rdma-engine/src/session.rs
@@ -0,0 +1,587 @@
+//! Session management for RDMA operations
+//!
+//! This module manages the lifecycle of RDMA sessions, including creation,
+//! storage, expiration, and cleanup of resources.
+
+use crate::{RdmaError, RdmaResult, rdma::MemoryRegion};
+use parking_lot::RwLock;
+use std::collections::HashMap;
+use std::sync::Arc;
+use tokio::time::{Duration, Instant};
+use tracing::{debug, info};
+// use uuid::Uuid; // Unused for now
+
+/// RDMA session state
+#[derive(Debug, Clone)]
+pub struct RdmaSession {
+ /// Unique session identifier
+ pub id: String,
+ /// SeaweedFS volume ID
+ pub volume_id: u32,
+ /// SeaweedFS needle ID
+ pub needle_id: u64,
+ /// Remote memory address
+ pub remote_addr: u64,
+ /// Remote key for RDMA access
+ pub remote_key: u32,
+ /// Transfer size in bytes
+ pub transfer_size: u64,
+ /// Local data buffer
+ pub buffer: Vec<u8>,
+ /// RDMA memory region
+ pub memory_region: MemoryRegion,
+ /// Session creation time
+ pub created_at: Instant,
+ /// Session expiration time
+ pub expires_at: Instant,
+ /// Current session state
+ pub state: SessionState,
+ /// Operation statistics
+ pub stats: SessionStats,
+}
+
+/// Session state enum
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub enum SessionState {
+ /// Session created but not yet active
+ Created,
+ /// RDMA operation in progress
+ Active,
+ /// Operation completed successfully
+ Completed,
+ /// Operation failed
+ Failed,
+ /// Session expired
+ Expired,
+ /// Session being cleaned up
+ CleaningUp,
+}
+
+/// Session operation statistics
+#[derive(Debug, Clone, Default)]
+pub struct SessionStats {
+ /// Number of RDMA operations performed
+ pub operations_count: u64,
+ /// Total bytes transferred
+ pub bytes_transferred: u64,
+ /// Time spent in RDMA operations (nanoseconds)
+ pub rdma_time_ns: u64,
+ /// Number of completion polling attempts
+ pub poll_attempts: u64,
+ /// Time of last operation
+ pub last_operation_at: Option<Instant>,
+}
+
+impl RdmaSession {
+ /// Create a new RDMA session
+ pub fn new(
+ id: String,
+ volume_id: u32,
+ needle_id: u64,
+ remote_addr: u64,
+ remote_key: u32,
+ transfer_size: u64,
+ buffer: Vec<u8>,
+ memory_region: MemoryRegion,
+ timeout: Duration,
+ ) -> Self {
+ let now = Instant::now();
+
+ Self {
+ id,
+ volume_id,
+ needle_id,
+ remote_addr,
+ remote_key,
+ transfer_size,
+ buffer,
+ memory_region,
+ created_at: now,
+ expires_at: now + timeout,
+ state: SessionState::Created,
+ stats: SessionStats::default(),
+ }
+ }
+
+ /// Check if session has expired
+ pub fn is_expired(&self) -> bool {
+ Instant::now() > self.expires_at
+ }
+
+ /// Get session age in seconds
+ pub fn age_secs(&self) -> f64 {
+ self.created_at.elapsed().as_secs_f64()
+ }
+
+ /// Get time until expiration in seconds
+ pub fn time_to_expiration_secs(&self) -> f64 {
+ if self.is_expired() {
+ 0.0
+ } else {
+ (self.expires_at - Instant::now()).as_secs_f64()
+ }
+ }
+
+ /// Update session state
+ pub fn set_state(&mut self, state: SessionState) {
+ debug!("Session {} state: {:?} -> {:?}", self.id, self.state, state);
+ self.state = state;
+ }
+
+ /// Record RDMA operation statistics
+ pub fn record_operation(&mut self, bytes_transferred: u64, duration_ns: u64) {
+ self.stats.operations_count += 1;
+ self.stats.bytes_transferred += bytes_transferred;
+ self.stats.rdma_time_ns += duration_ns;
+ self.stats.last_operation_at = Some(Instant::now());
+ }
+
+ /// Get average operation latency in nanoseconds
+ pub fn avg_operation_latency_ns(&self) -> u64 {
+ if self.stats.operations_count > 0 {
+ self.stats.rdma_time_ns / self.stats.operations_count
+ } else {
+ 0
+ }
+ }
+
+ /// Get throughput in bytes per second
+ pub fn throughput_bps(&self) -> f64 {
+ let age_secs = self.age_secs();
+ if age_secs > 0.0 {
+ self.stats.bytes_transferred as f64 / age_secs
+ } else {
+ 0.0
+ }
+ }
+}
+
+/// Session manager for handling multiple concurrent RDMA sessions
+pub struct SessionManager {
+ /// Active sessions
+ sessions: Arc<RwLock<HashMap<String, Arc<RwLock<RdmaSession>>>>>,
+ /// Maximum number of concurrent sessions
+ max_sessions: usize,
+ /// Default session timeout
+ #[allow(dead_code)]
+ default_timeout: Duration,
+ /// Cleanup task handle
+ cleanup_task: RwLock<Option<tokio::task::JoinHandle<()>>>,
+ /// Shutdown flag
+ shutdown_flag: Arc<RwLock<bool>>,
+ /// Statistics
+ stats: Arc<RwLock<SessionManagerStats>>,
+}
+
+/// Session manager statistics
+#[derive(Debug, Clone, Default)]
+pub struct SessionManagerStats {
+ /// Total sessions created
+ pub total_sessions_created: u64,
+ /// Total sessions completed
+ pub total_sessions_completed: u64,
+ /// Total sessions failed
+ pub total_sessions_failed: u64,
+ /// Total sessions expired
+ pub total_sessions_expired: u64,
+ /// Total bytes transferred across all sessions
+ pub total_bytes_transferred: u64,
+ /// Manager start time
+ pub started_at: Option<Instant>,
+}
+
+impl SessionManager {
+ /// Create new session manager
+ pub fn new(max_sessions: usize, default_timeout: Duration) -> Self {
+ info!("🎯 Session manager initialized: max_sessions={}, timeout={:?}",
+ max_sessions, default_timeout);
+
+ let mut stats = SessionManagerStats::default();
+ stats.started_at = Some(Instant::now());
+
+ Self {
+ sessions: Arc::new(RwLock::new(HashMap::new())),
+ max_sessions,
+ default_timeout,
+ cleanup_task: RwLock::new(None),
+ shutdown_flag: Arc::new(RwLock::new(false)),
+ stats: Arc::new(RwLock::new(stats)),
+ }
+ }
+
+ /// Create a new RDMA session
+ pub async fn create_session(
+ &self,
+ session_id: String,
+ volume_id: u32,
+ needle_id: u64,
+ remote_addr: u64,
+ remote_key: u32,
+ transfer_size: u64,
+ buffer: Vec<u8>,
+ memory_region: MemoryRegion,
+ timeout: chrono::Duration,
+ ) -> RdmaResult<Arc<RwLock<RdmaSession>>> {
+ // Check session limit
+ {
+ let sessions = self.sessions.read();
+ if sessions.len() >= self.max_sessions {
+ return Err(RdmaError::TooManySessions {
+ max_sessions: self.max_sessions
+ });
+ }
+
+ // Check if session already exists
+ if sessions.contains_key(&session_id) {
+ return Err(RdmaError::invalid_request(
+ format!("Session {} already exists", session_id)
+ ));
+ }
+ }
+
+ let timeout_duration = Duration::from_millis(timeout.num_milliseconds().max(1) as u64);
+
+ let session = Arc::new(RwLock::new(RdmaSession::new(
+ session_id.clone(),
+ volume_id,
+ needle_id,
+ remote_addr,
+ remote_key,
+ transfer_size,
+ buffer,
+ memory_region,
+ timeout_duration,
+ )));
+
+ // Store session
+ {
+ let mut sessions = self.sessions.write();
+ sessions.insert(session_id.clone(), session.clone());
+ }
+
+ // Update stats
+ {
+ let mut stats = self.stats.write();
+ stats.total_sessions_created += 1;
+ }
+
+ info!("📦 Created session {}: volume={}, needle={}, size={}",
+ session_id, volume_id, needle_id, transfer_size);
+
+ Ok(session)
+ }
+
+ /// Get session by ID
+ pub async fn get_session(&self, session_id: &str) -> RdmaResult<Arc<RwLock<RdmaSession>>> {
+ let sessions = self.sessions.read();
+ match sessions.get(session_id) {
+ Some(session) => {
+ if session.read().is_expired() {
+ Err(RdmaError::SessionExpired {
+ session_id: session_id.to_string()
+ })
+ } else {
+ Ok(session.clone())
+ }
+ }
+ None => Err(RdmaError::SessionNotFound {
+ session_id: session_id.to_string()
+ }),
+ }
+ }
+
+ /// Remove and cleanup session
+ pub async fn remove_session(&self, session_id: &str) -> RdmaResult<()> {
+ let session = {
+ let mut sessions = self.sessions.write();
+ sessions.remove(session_id)
+ };
+
+ if let Some(session) = session {
+ let session_data = session.read();
+ info!("🗑️ Removed session {}: stats={:?}", session_id, session_data.stats);
+
+ // Update manager stats
+ {
+ let mut stats = self.stats.write();
+ match session_data.state {
+ SessionState::Completed => stats.total_sessions_completed += 1,
+ SessionState::Failed => stats.total_sessions_failed += 1,
+ SessionState::Expired => stats.total_sessions_expired += 1,
+ _ => {}
+ }
+ stats.total_bytes_transferred += session_data.stats.bytes_transferred;
+ }
+
+ Ok(())
+ } else {
+ Err(RdmaError::SessionNotFound {
+ session_id: session_id.to_string()
+ })
+ }
+ }
+
+ /// Get active session count
+ pub async fn active_session_count(&self) -> usize {
+ self.sessions.read().len()
+ }
+
+ /// Get maximum sessions allowed
+ pub fn max_sessions(&self) -> usize {
+ self.max_sessions
+ }
+
+ /// List active sessions
+ pub async fn list_sessions(&self) -> Vec<String> {
+ self.sessions.read().keys().cloned().collect()
+ }
+
+ /// Get session statistics
+ pub async fn get_session_stats(&self, session_id: &str) -> RdmaResult<SessionStats> {
+ let session = self.get_session(session_id).await?;
+ let stats = {
+ let session_data = session.read();
+ session_data.stats.clone()
+ };
+ Ok(stats)
+ }
+
+ /// Get manager statistics
+ pub fn get_manager_stats(&self) -> SessionManagerStats {
+ self.stats.read().clone()
+ }
+
+ /// Start background cleanup task
+ pub async fn start_cleanup_task(&self) {
+ info!("📋 Session cleanup task initialized");
+
+ let sessions = Arc::clone(&self.sessions);
+ let shutdown_flag = Arc::clone(&self.shutdown_flag);
+ let stats = Arc::clone(&self.stats);
+
+ let task = tokio::spawn(async move {
+ let mut interval = tokio::time::interval(Duration::from_secs(30)); // Check every 30 seconds
+
+ loop {
+ interval.tick().await;
+
+ // Check shutdown flag
+ if *shutdown_flag.read() {
+ debug!("🛑 Session cleanup task shutting down");
+ break;
+ }
+
+ let now = Instant::now();
+ let mut expired_sessions = Vec::new();
+
+ // Find expired sessions
+ {
+ let sessions_guard = sessions.read();
+ for (session_id, session) in sessions_guard.iter() {
+ if now > session.read().expires_at {
+ expired_sessions.push(session_id.clone());
+ }
+ }
+ }
+
+ // Remove expired sessions
+ if !expired_sessions.is_empty() {
+ let mut sessions_guard = sessions.write();
+ let mut stats_guard = stats.write();
+
+ for session_id in expired_sessions {
+ if let Some(session) = sessions_guard.remove(&session_id) {
+ let session_data = session.read();
+ info!("🗑️ Cleaned up expired session: {} (volume={}, needle={})",
+ session_id, session_data.volume_id, session_data.needle_id);
+ stats_guard.total_sessions_expired += 1;
+ }
+ }
+
+ debug!("📊 Active sessions: {}", sessions_guard.len());
+ }
+ }
+ });
+
+ *self.cleanup_task.write() = Some(task);
+ }
+
+ /// Shutdown session manager
+ pub async fn shutdown(&self) {
+ info!("🛑 Shutting down session manager");
+ *self.shutdown_flag.write() = true;
+
+ // Wait for cleanup task to finish
+ if let Some(task) = self.cleanup_task.write().take() {
+ let _ = task.await;
+ }
+
+ // Clean up all remaining sessions
+ let session_ids: Vec<String> = {
+ self.sessions.read().keys().cloned().collect()
+ };
+
+ for session_id in session_ids {
+ let _ = self.remove_session(&session_id).await;
+ }
+
+ let final_stats = self.get_manager_stats();
+ info!("📈 Final session manager stats: {:?}", final_stats);
+ }
+
+ /// Force cleanup of all sessions (for testing)
+ #[cfg(test)]
+ pub async fn cleanup_all_sessions(&self) {
+ let session_ids: Vec<String> = {
+ self.sessions.read().keys().cloned().collect()
+ };
+
+ for session_id in session_ids {
+ let _ = self.remove_session(&session_id).await;
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::rdma::MemoryRegion;
+
+ #[tokio::test]
+ async fn test_session_creation() {
+ let manager = SessionManager::new(10, Duration::from_secs(60));
+
+ let memory_region = MemoryRegion {
+ addr: 0x1000,
+ rkey: 0x12345678,
+ lkey: 0x87654321,
+ size: 4096,
+ registered: true,
+ };
+
+ let session = manager.create_session(
+ "test-session".to_string(),
+ 1,
+ 100,
+ 0x2000,
+ 0xabcd,
+ 4096,
+ vec![0; 4096],
+ memory_region,
+ chrono::Duration::seconds(60),
+ ).await.unwrap();
+
+ let session_data = session.read();
+ assert_eq!(session_data.id, "test-session");
+ assert_eq!(session_data.volume_id, 1);
+ assert_eq!(session_data.needle_id, 100);
+ assert_eq!(session_data.state, SessionState::Created);
+ assert!(!session_data.is_expired());
+ }
+
+ #[tokio::test]
+ async fn test_session_expiration() {
+ let manager = SessionManager::new(10, Duration::from_millis(10));
+
+ let memory_region = MemoryRegion {
+ addr: 0x1000,
+ rkey: 0x12345678,
+ lkey: 0x87654321,
+ size: 4096,
+ registered: true,
+ };
+
+ let _session = manager.create_session(
+ "expire-test".to_string(),
+ 1,
+ 100,
+ 0x2000,
+ 0xabcd,
+ 4096,
+ vec![0; 4096],
+ memory_region,
+ chrono::Duration::milliseconds(10),
+ ).await.unwrap();
+
+ // Wait for expiration
+ tokio::time::sleep(Duration::from_millis(20)).await;
+
+ let result = manager.get_session("expire-test").await;
+ assert!(matches!(result, Err(RdmaError::SessionExpired { .. })));
+ }
+
+ #[tokio::test]
+ async fn test_session_limit() {
+ let manager = SessionManager::new(2, Duration::from_secs(60));
+
+ let memory_region = MemoryRegion {
+ addr: 0x1000,
+ rkey: 0x12345678,
+ lkey: 0x87654321,
+ size: 4096,
+ registered: true,
+ };
+
+ // Create first session
+ let _session1 = manager.create_session(
+ "session1".to_string(),
+ 1, 100, 0x2000, 0xabcd, 4096,
+ vec![0; 4096],
+ memory_region.clone(),
+ chrono::Duration::seconds(60),
+ ).await.unwrap();
+
+ // Create second session
+ let _session2 = manager.create_session(
+ "session2".to_string(),
+ 1, 101, 0x3000, 0xabcd, 4096,
+ vec![0; 4096],
+ memory_region.clone(),
+ chrono::Duration::seconds(60),
+ ).await.unwrap();
+
+ // Third session should fail
+ let result = manager.create_session(
+ "session3".to_string(),
+ 1, 102, 0x4000, 0xabcd, 4096,
+ vec![0; 4096],
+ memory_region,
+ chrono::Duration::seconds(60),
+ ).await;
+
+ assert!(matches!(result, Err(RdmaError::TooManySessions { .. })));
+ }
+
+ #[tokio::test]
+ async fn test_session_stats() {
+ let manager = SessionManager::new(10, Duration::from_secs(60));
+
+ let memory_region = MemoryRegion {
+ addr: 0x1000,
+ rkey: 0x12345678,
+ lkey: 0x87654321,
+ size: 4096,
+ registered: true,
+ };
+
+ let session = manager.create_session(
+ "stats-test".to_string(),
+ 1, 100, 0x2000, 0xabcd, 4096,
+ vec![0; 4096],
+ memory_region,
+ chrono::Duration::seconds(60),
+ ).await.unwrap();
+
+ // Simulate some operations - now using proper interior mutability
+ {
+ let mut session_data = session.write();
+ session_data.record_operation(1024, 1000000); // 1KB in 1ms
+ session_data.record_operation(2048, 2000000); // 2KB in 2ms
+ }
+
+ let stats = manager.get_session_stats("stats-test").await.unwrap();
+ assert_eq!(stats.operations_count, 2);
+ assert_eq!(stats.bytes_transferred, 3072);
+ assert_eq!(stats.rdma_time_ns, 3000000);
+ }
+}