aboutsummaryrefslogtreecommitdiff
path: root/seaweedfs-rdma-sidecar/rdma-engine/src/lib.rs
blob: c92dcf91ae857097efc28dd6d12855629dba1998 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
//! High-Performance RDMA Engine for SeaweedFS
//! 
//! This crate provides a high-performance RDMA (Remote Direct Memory Access) engine
//! designed to accelerate data transfer operations in SeaweedFS. It communicates with
//! the Go-based sidecar via IPC and handles the performance-critical RDMA operations.
//!
//! # Architecture
//!
//! ```text
//! ┌─────────────────────┐    IPC     ┌─────────────────────┐
//! │   Go Control Plane  │◄─────────►│  Rust Data Plane    │
//! │                     │  ~300ns    │                     │
//! │ • gRPC Server       │            │ • RDMA Operations   │
//! │ • Session Mgmt      │            │ • Memory Mgmt       │
//! │ • HTTP Fallback     │            │ • Hardware Access   │
//! │ • Error Handling    │            │ • Zero-Copy I/O     │
//! └─────────────────────┘            └─────────────────────┘
//! ```
//!
//! # Features
//!
//! - `mock-rdma` (default): Mock RDMA operations for testing and development
//! - `real-rdma`: Real RDMA hardware integration using rdma-core bindings

use std::sync::Arc;
use anyhow::Result;

pub mod ucx;
pub mod rdma;
pub mod ipc;
pub mod session;
pub mod memory;
pub mod error;

pub use error::{RdmaError, RdmaResult};

/// Configuration for the RDMA engine
#[derive(Debug, Clone)]
pub struct RdmaEngineConfig {
    /// RDMA device name (e.g., "mlx5_0")
    pub device_name: String,
    /// RDMA port number  
    pub port: u16,
    /// Maximum number of concurrent sessions
    pub max_sessions: usize,
    /// Session timeout in seconds
    pub session_timeout_secs: u64,
    /// Memory buffer size in bytes
    pub buffer_size: usize,
    /// IPC socket path
    pub ipc_socket_path: String,
    /// Enable debug logging
    pub debug: bool,
}

impl Default for RdmaEngineConfig {
    fn default() -> Self {
        Self {
            device_name: "mlx5_0".to_string(),
            port: 18515,
            max_sessions: 1000,
            session_timeout_secs: 300, // 5 minutes
            buffer_size: 1024 * 1024 * 1024, // 1GB
            ipc_socket_path: "/tmp/rdma-engine.sock".to_string(),
            debug: false,
        }
    }
}

/// Main RDMA engine instance
pub struct RdmaEngine {
    config: RdmaEngineConfig,
    rdma_context: Arc<rdma::RdmaContext>,
    session_manager: Arc<session::SessionManager>,
    ipc_server: Option<ipc::IpcServer>,
}

impl RdmaEngine {
    /// Create a new RDMA engine with the given configuration
    pub async fn new(config: RdmaEngineConfig) -> Result<Self> {
        tracing::info!("Initializing RDMA engine with config: {:?}", config);
        
        // Initialize RDMA context
        let rdma_context = Arc::new(rdma::RdmaContext::new(&config).await?);
        
        // Initialize session manager
        let session_manager = Arc::new(session::SessionManager::new(
            config.max_sessions,
            std::time::Duration::from_secs(config.session_timeout_secs),
        ));
        
        Ok(Self {
            config,
            rdma_context,
            session_manager,
            ipc_server: None,
        })
    }
    
    /// Start the RDMA engine server
    pub async fn run(&mut self) -> Result<()> {
        tracing::info!("Starting RDMA engine server on {}", self.config.ipc_socket_path);
        
        // Start IPC server
        let ipc_server = ipc::IpcServer::new(
            &self.config.ipc_socket_path,
            self.rdma_context.clone(),
            self.session_manager.clone(),
        ).await?;
        
        self.ipc_server = Some(ipc_server);
        
        // Start session cleanup task
        let session_manager = self.session_manager.clone();
        tokio::spawn(async move {
            session_manager.start_cleanup_task().await;
        });
        
        // Run IPC server
        if let Some(ref mut server) = self.ipc_server {
            server.run().await?;
        }
        
        Ok(())
    }
    
    /// Shutdown the RDMA engine
    pub async fn shutdown(&mut self) -> Result<()> {
        tracing::info!("Shutting down RDMA engine");
        
        if let Some(ref mut server) = self.ipc_server {
            server.shutdown().await?;
        }
        
        self.session_manager.shutdown().await;
        
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    
    #[tokio::test]
    async fn test_rdma_engine_creation() {
        let config = RdmaEngineConfig::default();
        let result = RdmaEngine::new(config).await;
        
        // Should succeed with mock RDMA
        assert!(result.is_ok());
    }
}