aboutsummaryrefslogtreecommitdiff
path: root/seaweedfs-rdma-sidecar/pkg/rdma/client.go
diff options
context:
space:
mode:
Diffstat (limited to 'seaweedfs-rdma-sidecar/pkg/rdma/client.go')
-rw-r--r--seaweedfs-rdma-sidecar/pkg/rdma/client.go630
1 files changed, 630 insertions, 0 deletions
diff --git a/seaweedfs-rdma-sidecar/pkg/rdma/client.go b/seaweedfs-rdma-sidecar/pkg/rdma/client.go
new file mode 100644
index 000000000..156bb5497
--- /dev/null
+++ b/seaweedfs-rdma-sidecar/pkg/rdma/client.go
@@ -0,0 +1,630 @@
+// Package rdma provides high-level RDMA operations for SeaweedFS integration
+package rdma
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+
+ "seaweedfs-rdma-sidecar/pkg/ipc"
+
+ "github.com/seaweedfs/seaweedfs/weed/storage/needle"
+ "github.com/sirupsen/logrus"
+)
+
+// PooledConnection represents a pooled RDMA connection
+type PooledConnection struct {
+ ipcClient *ipc.Client
+ lastUsed time.Time
+ inUse bool
+ sessionID string
+ created time.Time
+}
+
+// ConnectionPool manages a pool of RDMA connections
+type ConnectionPool struct {
+ connections []*PooledConnection
+ mutex sync.RWMutex
+ maxConnections int
+ maxIdleTime time.Duration
+ enginePath string
+ logger *logrus.Logger
+}
+
+// Client provides high-level RDMA operations with connection pooling
+type Client struct {
+ pool *ConnectionPool
+ logger *logrus.Logger
+ enginePath string
+ capabilities *ipc.GetCapabilitiesResponse
+ connected bool
+ defaultTimeout time.Duration
+
+ // Legacy single connection (for backward compatibility)
+ ipcClient *ipc.Client
+}
+
+// Config holds configuration for the RDMA client
+type Config struct {
+ EngineSocketPath string
+ DefaultTimeout time.Duration
+ Logger *logrus.Logger
+
+ // Connection pooling options
+ EnablePooling bool // Enable connection pooling (default: true)
+ MaxConnections int // Max connections in pool (default: 10)
+ MaxIdleTime time.Duration // Max idle time before connection cleanup (default: 5min)
+}
+
+// ReadRequest represents a SeaweedFS needle read request
+type ReadRequest struct {
+ VolumeID uint32
+ NeedleID uint64
+ Cookie uint32
+ Offset uint64
+ Size uint64
+ AuthToken *string
+}
+
+// ReadResponse represents the result of an RDMA read operation
+type ReadResponse struct {
+ Data []byte
+ BytesRead uint64
+ Duration time.Duration
+ TransferRate float64
+ SessionID string
+ Success bool
+ Message string
+}
+
+// NewConnectionPool creates a new connection pool
+func NewConnectionPool(enginePath string, maxConnections int, maxIdleTime time.Duration, logger *logrus.Logger) *ConnectionPool {
+ if maxConnections <= 0 {
+ maxConnections = 10 // Default
+ }
+ if maxIdleTime <= 0 {
+ maxIdleTime = 5 * time.Minute // Default
+ }
+
+ return &ConnectionPool{
+ connections: make([]*PooledConnection, 0, maxConnections),
+ maxConnections: maxConnections,
+ maxIdleTime: maxIdleTime,
+ enginePath: enginePath,
+ logger: logger,
+ }
+}
+
+// getConnection gets an available connection from the pool or creates a new one
+func (p *ConnectionPool) getConnection(ctx context.Context) (*PooledConnection, error) {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ // Look for an available connection
+ for _, conn := range p.connections {
+ if !conn.inUse && time.Since(conn.lastUsed) < p.maxIdleTime {
+ conn.inUse = true
+ conn.lastUsed = time.Now()
+ p.logger.WithField("session_id", conn.sessionID).Debug("๐Ÿ”Œ Reusing pooled RDMA connection")
+ return conn, nil
+ }
+ }
+
+ // Create new connection if under limit
+ if len(p.connections) < p.maxConnections {
+ ipcClient := ipc.NewClient(p.enginePath, p.logger)
+ if err := ipcClient.Connect(ctx); err != nil {
+ return nil, fmt.Errorf("failed to create new pooled connection: %w", err)
+ }
+
+ conn := &PooledConnection{
+ ipcClient: ipcClient,
+ lastUsed: time.Now(),
+ inUse: true,
+ sessionID: fmt.Sprintf("pool-%d-%d", len(p.connections), time.Now().Unix()),
+ created: time.Now(),
+ }
+
+ p.connections = append(p.connections, conn)
+ p.logger.WithFields(logrus.Fields{
+ "session_id": conn.sessionID,
+ "pool_size": len(p.connections),
+ }).Info("๐Ÿš€ Created new pooled RDMA connection")
+
+ return conn, nil
+ }
+
+ // Pool is full, wait for an available connection
+ return nil, fmt.Errorf("connection pool exhausted (max: %d)", p.maxConnections)
+}
+
+// releaseConnection returns a connection to the pool
+func (p *ConnectionPool) releaseConnection(conn *PooledConnection) {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ conn.inUse = false
+ conn.lastUsed = time.Now()
+
+ p.logger.WithField("session_id", conn.sessionID).Debug("๐Ÿ”„ Released RDMA connection back to pool")
+}
+
+// cleanup removes idle connections from the pool
+func (p *ConnectionPool) cleanup() {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ now := time.Now()
+ activeConnections := make([]*PooledConnection, 0, len(p.connections))
+
+ for _, conn := range p.connections {
+ if conn.inUse || now.Sub(conn.lastUsed) < p.maxIdleTime {
+ activeConnections = append(activeConnections, conn)
+ } else {
+ // Close idle connection
+ conn.ipcClient.Disconnect()
+ p.logger.WithFields(logrus.Fields{
+ "session_id": conn.sessionID,
+ "idle_time": now.Sub(conn.lastUsed),
+ }).Debug("๐Ÿงน Cleaned up idle RDMA connection")
+ }
+ }
+
+ p.connections = activeConnections
+}
+
+// Close closes all connections in the pool
+func (p *ConnectionPool) Close() {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ for _, conn := range p.connections {
+ conn.ipcClient.Disconnect()
+ }
+ p.connections = nil
+ p.logger.Info("๐Ÿ”Œ Connection pool closed")
+}
+
+// NewClient creates a new RDMA client
+func NewClient(config *Config) *Client {
+ if config.Logger == nil {
+ config.Logger = logrus.New()
+ config.Logger.SetLevel(logrus.InfoLevel)
+ }
+
+ if config.DefaultTimeout == 0 {
+ config.DefaultTimeout = 30 * time.Second
+ }
+
+ client := &Client{
+ logger: config.Logger,
+ enginePath: config.EngineSocketPath,
+ defaultTimeout: config.DefaultTimeout,
+ }
+
+ // Initialize connection pooling if enabled (default: true)
+ enablePooling := config.EnablePooling
+ if config.MaxConnections == 0 && config.MaxIdleTime == 0 {
+ // Default to enabled if not explicitly configured
+ enablePooling = true
+ }
+
+ if enablePooling {
+ client.pool = NewConnectionPool(
+ config.EngineSocketPath,
+ config.MaxConnections,
+ config.MaxIdleTime,
+ config.Logger,
+ )
+
+ // Start cleanup goroutine
+ go client.startCleanupRoutine()
+
+ config.Logger.WithFields(logrus.Fields{
+ "max_connections": client.pool.maxConnections,
+ "max_idle_time": client.pool.maxIdleTime,
+ }).Info("๐Ÿ”Œ RDMA connection pooling enabled")
+ } else {
+ // Legacy single connection mode
+ client.ipcClient = ipc.NewClient(config.EngineSocketPath, config.Logger)
+ config.Logger.Info("๐Ÿ”Œ RDMA single connection mode (pooling disabled)")
+ }
+
+ return client
+}
+
+// startCleanupRoutine starts a background goroutine to clean up idle connections
+func (c *Client) startCleanupRoutine() {
+ ticker := time.NewTicker(1 * time.Minute) // Cleanup every minute
+ go func() {
+ defer ticker.Stop()
+ for range ticker.C {
+ if c.pool != nil {
+ c.pool.cleanup()
+ }
+ }
+ }()
+}
+
+// Connect establishes connection to the Rust RDMA engine and queries capabilities
+func (c *Client) Connect(ctx context.Context) error {
+ c.logger.Info("๐Ÿš€ Connecting to RDMA engine")
+
+ if c.pool != nil {
+ // Connection pooling mode - connections are created on-demand
+ c.connected = true
+ c.logger.Info("โœ… RDMA client ready (connection pooling enabled)")
+ return nil
+ }
+
+ // Single connection mode
+ if err := c.ipcClient.Connect(ctx); err != nil {
+ return fmt.Errorf("failed to connect to IPC: %w", err)
+ }
+
+ // Test connectivity with ping
+ clientID := "rdma-client"
+ pong, err := c.ipcClient.Ping(ctx, &clientID)
+ if err != nil {
+ c.ipcClient.Disconnect()
+ return fmt.Errorf("failed to ping RDMA engine: %w", err)
+ }
+
+ latency := time.Duration(pong.ServerRttNs)
+ c.logger.WithFields(logrus.Fields{
+ "latency": latency,
+ "server_rtt": time.Duration(pong.ServerRttNs),
+ }).Info("๐Ÿ“ก RDMA engine ping successful")
+
+ // Get capabilities
+ caps, err := c.ipcClient.GetCapabilities(ctx, &clientID)
+ if err != nil {
+ c.ipcClient.Disconnect()
+ return fmt.Errorf("failed to get engine capabilities: %w", err)
+ }
+
+ c.capabilities = caps
+ c.connected = true
+
+ c.logger.WithFields(logrus.Fields{
+ "version": caps.Version,
+ "device_name": caps.DeviceName,
+ "vendor_id": caps.VendorId,
+ "max_sessions": caps.MaxSessions,
+ "max_transfer_size": caps.MaxTransferSize,
+ "active_sessions": caps.ActiveSessions,
+ "real_rdma": caps.RealRdma,
+ "port_gid": caps.PortGid,
+ "port_lid": caps.PortLid,
+ }).Info("โœ… RDMA engine connected and ready")
+
+ return nil
+}
+
+// Disconnect closes the connection to the RDMA engine
+func (c *Client) Disconnect() {
+ if c.connected {
+ if c.pool != nil {
+ // Connection pooling mode
+ c.pool.Close()
+ c.logger.Info("๐Ÿ”Œ Disconnected from RDMA engine (pool closed)")
+ } else {
+ // Single connection mode
+ c.ipcClient.Disconnect()
+ c.logger.Info("๐Ÿ”Œ Disconnected from RDMA engine")
+ }
+ c.connected = false
+ }
+}
+
+// IsConnected returns true if connected to the RDMA engine
+func (c *Client) IsConnected() bool {
+ if c.pool != nil {
+ // Connection pooling mode - always connected if pool exists
+ return c.connected
+ } else {
+ // Single connection mode
+ return c.connected && c.ipcClient.IsConnected()
+ }
+}
+
+// GetCapabilities returns the RDMA engine capabilities
+func (c *Client) GetCapabilities() *ipc.GetCapabilitiesResponse {
+ return c.capabilities
+}
+
+// Read performs an RDMA read operation for a SeaweedFS needle
+func (c *Client) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) {
+ if !c.IsConnected() {
+ return nil, fmt.Errorf("not connected to RDMA engine")
+ }
+
+ startTime := time.Now()
+
+ c.logger.WithFields(logrus.Fields{
+ "volume_id": req.VolumeID,
+ "needle_id": req.NeedleID,
+ "offset": req.Offset,
+ "size": req.Size,
+ }).Debug("๐Ÿ“– Starting RDMA read operation")
+
+ if c.pool != nil {
+ // Connection pooling mode
+ return c.readWithPool(ctx, req, startTime)
+ }
+
+ // Single connection mode
+ // Create IPC request
+ ipcReq := &ipc.StartReadRequest{
+ VolumeID: req.VolumeID,
+ NeedleID: req.NeedleID,
+ Cookie: req.Cookie,
+ Offset: req.Offset,
+ Size: req.Size,
+ RemoteAddr: 0, // Will be set by engine (mock for now)
+ RemoteKey: 0, // Will be set by engine (mock for now)
+ TimeoutSecs: uint64(c.defaultTimeout.Seconds()),
+ AuthToken: req.AuthToken,
+ }
+
+ // Start RDMA read
+ startResp, err := c.ipcClient.StartRead(ctx, ipcReq)
+ if err != nil {
+ c.logger.WithError(err).Error("โŒ Failed to start RDMA read")
+ return nil, fmt.Errorf("failed to start RDMA read: %w", err)
+ }
+
+ // In the new protocol, if we got a StartReadResponse, the operation was successful
+
+ c.logger.WithFields(logrus.Fields{
+ "session_id": startResp.SessionID,
+ "local_addr": fmt.Sprintf("0x%x", startResp.LocalAddr),
+ "local_key": startResp.LocalKey,
+ "transfer_size": startResp.TransferSize,
+ "expected_crc": fmt.Sprintf("0x%x", startResp.ExpectedCrc),
+ "expires_at": time.Unix(0, int64(startResp.ExpiresAtNs)).Format(time.RFC3339),
+ }).Debug("๐Ÿ“– RDMA read session started")
+
+ // Complete the RDMA read
+ completeResp, err := c.ipcClient.CompleteRead(ctx, startResp.SessionID, true, startResp.TransferSize, &startResp.ExpectedCrc)
+ if err != nil {
+ c.logger.WithError(err).Error("โŒ Failed to complete RDMA read")
+ return nil, fmt.Errorf("failed to complete RDMA read: %w", err)
+ }
+
+ duration := time.Since(startTime)
+
+ if !completeResp.Success {
+ errorMsg := "unknown error"
+ if completeResp.Message != nil {
+ errorMsg = *completeResp.Message
+ }
+ c.logger.WithFields(logrus.Fields{
+ "session_id": startResp.SessionID,
+ "error_message": errorMsg,
+ }).Error("โŒ RDMA read completion failed")
+ return nil, fmt.Errorf("RDMA read completion failed: %s", errorMsg)
+ }
+
+ // Calculate transfer rate (bytes/second)
+ transferRate := float64(startResp.TransferSize) / duration.Seconds()
+
+ c.logger.WithFields(logrus.Fields{
+ "session_id": startResp.SessionID,
+ "bytes_read": startResp.TransferSize,
+ "duration": duration,
+ "transfer_rate": transferRate,
+ "server_crc": completeResp.ServerCrc,
+ }).Info("โœ… RDMA read completed successfully")
+
+ // MOCK DATA IMPLEMENTATION - FOR DEVELOPMENT/TESTING ONLY
+ //
+ // This section generates placeholder data for the mock RDMA implementation.
+ // In a production RDMA implementation, this should be replaced with:
+ //
+ // 1. The actual data transferred via RDMA from the remote memory region
+ // 2. Data validation using checksums/CRC from the RDMA completion
+ // 3. Proper error handling for RDMA transfer failures
+ // 4. Memory region cleanup and deregistration
+ //
+ // TODO for real RDMA implementation:
+ // - Replace mockData with actual RDMA buffer contents
+ // - Validate data integrity using server CRC: completeResp.ServerCrc
+ // - Handle partial transfers and retry logic
+ // - Implement proper memory management for RDMA regions
+ //
+ // Current mock behavior: Generates a simple pattern (0,1,2...255,0,1,2...)
+ // This allows testing of the integration pipeline without real hardware
+ mockData := make([]byte, startResp.TransferSize)
+ for i := range mockData {
+ mockData[i] = byte(i % 256) // Simple repeating pattern for verification
+ }
+ // END MOCK DATA IMPLEMENTATION
+
+ return &ReadResponse{
+ Data: mockData,
+ BytesRead: startResp.TransferSize,
+ Duration: duration,
+ TransferRate: transferRate,
+ SessionID: startResp.SessionID,
+ Success: true,
+ Message: "RDMA read completed successfully",
+ }, nil
+}
+
+// ReadRange performs an RDMA read for a specific range within a needle
+func (c *Client) ReadRange(ctx context.Context, volumeID uint32, needleID uint64, cookie uint32, offset, size uint64) (*ReadResponse, error) {
+ req := &ReadRequest{
+ VolumeID: volumeID,
+ NeedleID: needleID,
+ Cookie: cookie,
+ Offset: offset,
+ Size: size,
+ }
+ return c.Read(ctx, req)
+}
+
+// ReadFileRange performs an RDMA read using SeaweedFS file ID format
+func (c *Client) ReadFileRange(ctx context.Context, fileID string, offset, size uint64) (*ReadResponse, error) {
+ // Parse file ID (e.g., "3,01637037d6" -> volume=3, needle=0x01637037d6, cookie extracted)
+ volumeID, needleID, cookie, err := parseFileID(fileID)
+ if err != nil {
+ return nil, fmt.Errorf("invalid file ID %s: %w", fileID, err)
+ }
+
+ req := &ReadRequest{
+ VolumeID: volumeID,
+ NeedleID: needleID,
+ Cookie: cookie,
+ Offset: offset,
+ Size: size,
+ }
+ return c.Read(ctx, req)
+}
+
+// parseFileID extracts volume ID, needle ID, and cookie from a SeaweedFS file ID
+// Uses existing SeaweedFS parsing logic to ensure compatibility
+func parseFileID(fileId string) (volumeID uint32, needleID uint64, cookie uint32, err error) {
+ // Use existing SeaweedFS file ID parsing
+ fid, err := needle.ParseFileIdFromString(fileId)
+ if err != nil {
+ return 0, 0, 0, fmt.Errorf("failed to parse file ID %s: %w", fileId, err)
+ }
+
+ volumeID = uint32(fid.VolumeId)
+ needleID = uint64(fid.Key)
+ cookie = uint32(fid.Cookie)
+
+ return volumeID, needleID, cookie, nil
+}
+
+// ReadFull performs an RDMA read for an entire needle
+func (c *Client) ReadFull(ctx context.Context, volumeID uint32, needleID uint64, cookie uint32) (*ReadResponse, error) {
+ req := &ReadRequest{
+ VolumeID: volumeID,
+ NeedleID: needleID,
+ Cookie: cookie,
+ Offset: 0,
+ Size: 0, // 0 means read entire needle
+ }
+ return c.Read(ctx, req)
+}
+
+// Ping tests connectivity to the RDMA engine
+func (c *Client) Ping(ctx context.Context) (time.Duration, error) {
+ if !c.IsConnected() {
+ return 0, fmt.Errorf("not connected to RDMA engine")
+ }
+
+ clientID := "health-check"
+ start := time.Now()
+ pong, err := c.ipcClient.Ping(ctx, &clientID)
+ if err != nil {
+ return 0, err
+ }
+
+ totalLatency := time.Since(start)
+ serverRtt := time.Duration(pong.ServerRttNs)
+
+ c.logger.WithFields(logrus.Fields{
+ "total_latency": totalLatency,
+ "server_rtt": serverRtt,
+ "client_id": clientID,
+ }).Debug("๐Ÿ“ RDMA engine ping successful")
+
+ return totalLatency, nil
+}
+
+// readWithPool performs RDMA read using connection pooling
+func (c *Client) readWithPool(ctx context.Context, req *ReadRequest, startTime time.Time) (*ReadResponse, error) {
+ // Get connection from pool
+ conn, err := c.pool.getConnection(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get pooled connection: %w", err)
+ }
+ defer c.pool.releaseConnection(conn)
+
+ c.logger.WithField("session_id", conn.sessionID).Debug("๐Ÿ”Œ Using pooled RDMA connection")
+
+ // Create IPC request
+ ipcReq := &ipc.StartReadRequest{
+ VolumeID: req.VolumeID,
+ NeedleID: req.NeedleID,
+ Cookie: req.Cookie,
+ Offset: req.Offset,
+ Size: req.Size,
+ RemoteAddr: 0, // Will be set by engine (mock for now)
+ RemoteKey: 0, // Will be set by engine (mock for now)
+ TimeoutSecs: uint64(c.defaultTimeout.Seconds()),
+ AuthToken: req.AuthToken,
+ }
+
+ // Start RDMA read
+ startResp, err := conn.ipcClient.StartRead(ctx, ipcReq)
+ if err != nil {
+ c.logger.WithError(err).Error("โŒ Failed to start RDMA read (pooled)")
+ return nil, fmt.Errorf("failed to start RDMA read: %w", err)
+ }
+
+ c.logger.WithFields(logrus.Fields{
+ "session_id": startResp.SessionID,
+ "local_addr": fmt.Sprintf("0x%x", startResp.LocalAddr),
+ "local_key": startResp.LocalKey,
+ "transfer_size": startResp.TransferSize,
+ "expected_crc": fmt.Sprintf("0x%x", startResp.ExpectedCrc),
+ "expires_at": time.Unix(0, int64(startResp.ExpiresAtNs)).Format(time.RFC3339),
+ "pooled": true,
+ }).Debug("๐Ÿ“– RDMA read session started (pooled)")
+
+ // Complete the RDMA read
+ completeResp, err := conn.ipcClient.CompleteRead(ctx, startResp.SessionID, true, startResp.TransferSize, &startResp.ExpectedCrc)
+ if err != nil {
+ c.logger.WithError(err).Error("โŒ Failed to complete RDMA read (pooled)")
+ return nil, fmt.Errorf("failed to complete RDMA read: %w", err)
+ }
+
+ duration := time.Since(startTime)
+
+ if !completeResp.Success {
+ errorMsg := "unknown error"
+ if completeResp.Message != nil {
+ errorMsg = *completeResp.Message
+ }
+ c.logger.WithFields(logrus.Fields{
+ "session_id": conn.sessionID,
+ "error_message": errorMsg,
+ "pooled": true,
+ }).Error("โŒ RDMA read completion failed (pooled)")
+ return nil, fmt.Errorf("RDMA read completion failed: %s", errorMsg)
+ }
+
+ // Calculate transfer rate (bytes/second)
+ transferRate := float64(startResp.TransferSize) / duration.Seconds()
+
+ c.logger.WithFields(logrus.Fields{
+ "session_id": conn.sessionID,
+ "bytes_read": startResp.TransferSize,
+ "duration": duration,
+ "transfer_rate": transferRate,
+ "server_crc": completeResp.ServerCrc,
+ "pooled": true,
+ }).Info("โœ… RDMA read completed successfully (pooled)")
+
+ // For the mock implementation, we'll return placeholder data
+ // In the real implementation, this would be the actual RDMA transferred data
+ mockData := make([]byte, startResp.TransferSize)
+ for i := range mockData {
+ mockData[i] = byte(i % 256) // Simple pattern for testing
+ }
+
+ return &ReadResponse{
+ Data: mockData,
+ BytesRead: startResp.TransferSize,
+ Duration: duration,
+ TransferRate: transferRate,
+ SessionID: conn.sessionID,
+ Success: true,
+ Message: "RDMA read successful (pooled)",
+ }, nil
+}