diff options
Diffstat (limited to 'seaweedfs-rdma-sidecar/pkg/rdma/client.go')
| -rw-r--r-- | seaweedfs-rdma-sidecar/pkg/rdma/client.go | 630 |
1 files changed, 630 insertions, 0 deletions
diff --git a/seaweedfs-rdma-sidecar/pkg/rdma/client.go b/seaweedfs-rdma-sidecar/pkg/rdma/client.go new file mode 100644 index 000000000..156bb5497 --- /dev/null +++ b/seaweedfs-rdma-sidecar/pkg/rdma/client.go @@ -0,0 +1,630 @@ +// Package rdma provides high-level RDMA operations for SeaweedFS integration +package rdma + +import ( + "context" + "fmt" + "sync" + "time" + + "seaweedfs-rdma-sidecar/pkg/ipc" + + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/sirupsen/logrus" +) + +// PooledConnection represents a pooled RDMA connection +type PooledConnection struct { + ipcClient *ipc.Client + lastUsed time.Time + inUse bool + sessionID string + created time.Time +} + +// ConnectionPool manages a pool of RDMA connections +type ConnectionPool struct { + connections []*PooledConnection + mutex sync.RWMutex + maxConnections int + maxIdleTime time.Duration + enginePath string + logger *logrus.Logger +} + +// Client provides high-level RDMA operations with connection pooling +type Client struct { + pool *ConnectionPool + logger *logrus.Logger + enginePath string + capabilities *ipc.GetCapabilitiesResponse + connected bool + defaultTimeout time.Duration + + // Legacy single connection (for backward compatibility) + ipcClient *ipc.Client +} + +// Config holds configuration for the RDMA client +type Config struct { + EngineSocketPath string + DefaultTimeout time.Duration + Logger *logrus.Logger + + // Connection pooling options + EnablePooling bool // Enable connection pooling (default: true) + MaxConnections int // Max connections in pool (default: 10) + MaxIdleTime time.Duration // Max idle time before connection cleanup (default: 5min) +} + +// ReadRequest represents a SeaweedFS needle read request +type ReadRequest struct { + VolumeID uint32 + NeedleID uint64 + Cookie uint32 + Offset uint64 + Size uint64 + AuthToken *string +} + +// ReadResponse represents the result of an RDMA read operation +type ReadResponse struct { + Data []byte + BytesRead uint64 + Duration time.Duration + TransferRate float64 + SessionID string + Success bool + Message string +} + +// NewConnectionPool creates a new connection pool +func NewConnectionPool(enginePath string, maxConnections int, maxIdleTime time.Duration, logger *logrus.Logger) *ConnectionPool { + if maxConnections <= 0 { + maxConnections = 10 // Default + } + if maxIdleTime <= 0 { + maxIdleTime = 5 * time.Minute // Default + } + + return &ConnectionPool{ + connections: make([]*PooledConnection, 0, maxConnections), + maxConnections: maxConnections, + maxIdleTime: maxIdleTime, + enginePath: enginePath, + logger: logger, + } +} + +// getConnection gets an available connection from the pool or creates a new one +func (p *ConnectionPool) getConnection(ctx context.Context) (*PooledConnection, error) { + p.mutex.Lock() + defer p.mutex.Unlock() + + // Look for an available connection + for _, conn := range p.connections { + if !conn.inUse && time.Since(conn.lastUsed) < p.maxIdleTime { + conn.inUse = true + conn.lastUsed = time.Now() + p.logger.WithField("session_id", conn.sessionID).Debug("๐ Reusing pooled RDMA connection") + return conn, nil + } + } + + // Create new connection if under limit + if len(p.connections) < p.maxConnections { + ipcClient := ipc.NewClient(p.enginePath, p.logger) + if err := ipcClient.Connect(ctx); err != nil { + return nil, fmt.Errorf("failed to create new pooled connection: %w", err) + } + + conn := &PooledConnection{ + ipcClient: ipcClient, + lastUsed: time.Now(), + inUse: true, + sessionID: fmt.Sprintf("pool-%d-%d", len(p.connections), time.Now().Unix()), + created: time.Now(), + } + + p.connections = append(p.connections, conn) + p.logger.WithFields(logrus.Fields{ + "session_id": conn.sessionID, + "pool_size": len(p.connections), + }).Info("๐ Created new pooled RDMA connection") + + return conn, nil + } + + // Pool is full, wait for an available connection + return nil, fmt.Errorf("connection pool exhausted (max: %d)", p.maxConnections) +} + +// releaseConnection returns a connection to the pool +func (p *ConnectionPool) releaseConnection(conn *PooledConnection) { + p.mutex.Lock() + defer p.mutex.Unlock() + + conn.inUse = false + conn.lastUsed = time.Now() + + p.logger.WithField("session_id", conn.sessionID).Debug("๐ Released RDMA connection back to pool") +} + +// cleanup removes idle connections from the pool +func (p *ConnectionPool) cleanup() { + p.mutex.Lock() + defer p.mutex.Unlock() + + now := time.Now() + activeConnections := make([]*PooledConnection, 0, len(p.connections)) + + for _, conn := range p.connections { + if conn.inUse || now.Sub(conn.lastUsed) < p.maxIdleTime { + activeConnections = append(activeConnections, conn) + } else { + // Close idle connection + conn.ipcClient.Disconnect() + p.logger.WithFields(logrus.Fields{ + "session_id": conn.sessionID, + "idle_time": now.Sub(conn.lastUsed), + }).Debug("๐งน Cleaned up idle RDMA connection") + } + } + + p.connections = activeConnections +} + +// Close closes all connections in the pool +func (p *ConnectionPool) Close() { + p.mutex.Lock() + defer p.mutex.Unlock() + + for _, conn := range p.connections { + conn.ipcClient.Disconnect() + } + p.connections = nil + p.logger.Info("๐ Connection pool closed") +} + +// NewClient creates a new RDMA client +func NewClient(config *Config) *Client { + if config.Logger == nil { + config.Logger = logrus.New() + config.Logger.SetLevel(logrus.InfoLevel) + } + + if config.DefaultTimeout == 0 { + config.DefaultTimeout = 30 * time.Second + } + + client := &Client{ + logger: config.Logger, + enginePath: config.EngineSocketPath, + defaultTimeout: config.DefaultTimeout, + } + + // Initialize connection pooling if enabled (default: true) + enablePooling := config.EnablePooling + if config.MaxConnections == 0 && config.MaxIdleTime == 0 { + // Default to enabled if not explicitly configured + enablePooling = true + } + + if enablePooling { + client.pool = NewConnectionPool( + config.EngineSocketPath, + config.MaxConnections, + config.MaxIdleTime, + config.Logger, + ) + + // Start cleanup goroutine + go client.startCleanupRoutine() + + config.Logger.WithFields(logrus.Fields{ + "max_connections": client.pool.maxConnections, + "max_idle_time": client.pool.maxIdleTime, + }).Info("๐ RDMA connection pooling enabled") + } else { + // Legacy single connection mode + client.ipcClient = ipc.NewClient(config.EngineSocketPath, config.Logger) + config.Logger.Info("๐ RDMA single connection mode (pooling disabled)") + } + + return client +} + +// startCleanupRoutine starts a background goroutine to clean up idle connections +func (c *Client) startCleanupRoutine() { + ticker := time.NewTicker(1 * time.Minute) // Cleanup every minute + go func() { + defer ticker.Stop() + for range ticker.C { + if c.pool != nil { + c.pool.cleanup() + } + } + }() +} + +// Connect establishes connection to the Rust RDMA engine and queries capabilities +func (c *Client) Connect(ctx context.Context) error { + c.logger.Info("๐ Connecting to RDMA engine") + + if c.pool != nil { + // Connection pooling mode - connections are created on-demand + c.connected = true + c.logger.Info("โ
RDMA client ready (connection pooling enabled)") + return nil + } + + // Single connection mode + if err := c.ipcClient.Connect(ctx); err != nil { + return fmt.Errorf("failed to connect to IPC: %w", err) + } + + // Test connectivity with ping + clientID := "rdma-client" + pong, err := c.ipcClient.Ping(ctx, &clientID) + if err != nil { + c.ipcClient.Disconnect() + return fmt.Errorf("failed to ping RDMA engine: %w", err) + } + + latency := time.Duration(pong.ServerRttNs) + c.logger.WithFields(logrus.Fields{ + "latency": latency, + "server_rtt": time.Duration(pong.ServerRttNs), + }).Info("๐ก RDMA engine ping successful") + + // Get capabilities + caps, err := c.ipcClient.GetCapabilities(ctx, &clientID) + if err != nil { + c.ipcClient.Disconnect() + return fmt.Errorf("failed to get engine capabilities: %w", err) + } + + c.capabilities = caps + c.connected = true + + c.logger.WithFields(logrus.Fields{ + "version": caps.Version, + "device_name": caps.DeviceName, + "vendor_id": caps.VendorId, + "max_sessions": caps.MaxSessions, + "max_transfer_size": caps.MaxTransferSize, + "active_sessions": caps.ActiveSessions, + "real_rdma": caps.RealRdma, + "port_gid": caps.PortGid, + "port_lid": caps.PortLid, + }).Info("โ
RDMA engine connected and ready") + + return nil +} + +// Disconnect closes the connection to the RDMA engine +func (c *Client) Disconnect() { + if c.connected { + if c.pool != nil { + // Connection pooling mode + c.pool.Close() + c.logger.Info("๐ Disconnected from RDMA engine (pool closed)") + } else { + // Single connection mode + c.ipcClient.Disconnect() + c.logger.Info("๐ Disconnected from RDMA engine") + } + c.connected = false + } +} + +// IsConnected returns true if connected to the RDMA engine +func (c *Client) IsConnected() bool { + if c.pool != nil { + // Connection pooling mode - always connected if pool exists + return c.connected + } else { + // Single connection mode + return c.connected && c.ipcClient.IsConnected() + } +} + +// GetCapabilities returns the RDMA engine capabilities +func (c *Client) GetCapabilities() *ipc.GetCapabilitiesResponse { + return c.capabilities +} + +// Read performs an RDMA read operation for a SeaweedFS needle +func (c *Client) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) { + if !c.IsConnected() { + return nil, fmt.Errorf("not connected to RDMA engine") + } + + startTime := time.Now() + + c.logger.WithFields(logrus.Fields{ + "volume_id": req.VolumeID, + "needle_id": req.NeedleID, + "offset": req.Offset, + "size": req.Size, + }).Debug("๐ Starting RDMA read operation") + + if c.pool != nil { + // Connection pooling mode + return c.readWithPool(ctx, req, startTime) + } + + // Single connection mode + // Create IPC request + ipcReq := &ipc.StartReadRequest{ + VolumeID: req.VolumeID, + NeedleID: req.NeedleID, + Cookie: req.Cookie, + Offset: req.Offset, + Size: req.Size, + RemoteAddr: 0, // Will be set by engine (mock for now) + RemoteKey: 0, // Will be set by engine (mock for now) + TimeoutSecs: uint64(c.defaultTimeout.Seconds()), + AuthToken: req.AuthToken, + } + + // Start RDMA read + startResp, err := c.ipcClient.StartRead(ctx, ipcReq) + if err != nil { + c.logger.WithError(err).Error("โ Failed to start RDMA read") + return nil, fmt.Errorf("failed to start RDMA read: %w", err) + } + + // In the new protocol, if we got a StartReadResponse, the operation was successful + + c.logger.WithFields(logrus.Fields{ + "session_id": startResp.SessionID, + "local_addr": fmt.Sprintf("0x%x", startResp.LocalAddr), + "local_key": startResp.LocalKey, + "transfer_size": startResp.TransferSize, + "expected_crc": fmt.Sprintf("0x%x", startResp.ExpectedCrc), + "expires_at": time.Unix(0, int64(startResp.ExpiresAtNs)).Format(time.RFC3339), + }).Debug("๐ RDMA read session started") + + // Complete the RDMA read + completeResp, err := c.ipcClient.CompleteRead(ctx, startResp.SessionID, true, startResp.TransferSize, &startResp.ExpectedCrc) + if err != nil { + c.logger.WithError(err).Error("โ Failed to complete RDMA read") + return nil, fmt.Errorf("failed to complete RDMA read: %w", err) + } + + duration := time.Since(startTime) + + if !completeResp.Success { + errorMsg := "unknown error" + if completeResp.Message != nil { + errorMsg = *completeResp.Message + } + c.logger.WithFields(logrus.Fields{ + "session_id": startResp.SessionID, + "error_message": errorMsg, + }).Error("โ RDMA read completion failed") + return nil, fmt.Errorf("RDMA read completion failed: %s", errorMsg) + } + + // Calculate transfer rate (bytes/second) + transferRate := float64(startResp.TransferSize) / duration.Seconds() + + c.logger.WithFields(logrus.Fields{ + "session_id": startResp.SessionID, + "bytes_read": startResp.TransferSize, + "duration": duration, + "transfer_rate": transferRate, + "server_crc": completeResp.ServerCrc, + }).Info("โ
RDMA read completed successfully") + + // MOCK DATA IMPLEMENTATION - FOR DEVELOPMENT/TESTING ONLY + // + // This section generates placeholder data for the mock RDMA implementation. + // In a production RDMA implementation, this should be replaced with: + // + // 1. The actual data transferred via RDMA from the remote memory region + // 2. Data validation using checksums/CRC from the RDMA completion + // 3. Proper error handling for RDMA transfer failures + // 4. Memory region cleanup and deregistration + // + // TODO for real RDMA implementation: + // - Replace mockData with actual RDMA buffer contents + // - Validate data integrity using server CRC: completeResp.ServerCrc + // - Handle partial transfers and retry logic + // - Implement proper memory management for RDMA regions + // + // Current mock behavior: Generates a simple pattern (0,1,2...255,0,1,2...) + // This allows testing of the integration pipeline without real hardware + mockData := make([]byte, startResp.TransferSize) + for i := range mockData { + mockData[i] = byte(i % 256) // Simple repeating pattern for verification + } + // END MOCK DATA IMPLEMENTATION + + return &ReadResponse{ + Data: mockData, + BytesRead: startResp.TransferSize, + Duration: duration, + TransferRate: transferRate, + SessionID: startResp.SessionID, + Success: true, + Message: "RDMA read completed successfully", + }, nil +} + +// ReadRange performs an RDMA read for a specific range within a needle +func (c *Client) ReadRange(ctx context.Context, volumeID uint32, needleID uint64, cookie uint32, offset, size uint64) (*ReadResponse, error) { + req := &ReadRequest{ + VolumeID: volumeID, + NeedleID: needleID, + Cookie: cookie, + Offset: offset, + Size: size, + } + return c.Read(ctx, req) +} + +// ReadFileRange performs an RDMA read using SeaweedFS file ID format +func (c *Client) ReadFileRange(ctx context.Context, fileID string, offset, size uint64) (*ReadResponse, error) { + // Parse file ID (e.g., "3,01637037d6" -> volume=3, needle=0x01637037d6, cookie extracted) + volumeID, needleID, cookie, err := parseFileID(fileID) + if err != nil { + return nil, fmt.Errorf("invalid file ID %s: %w", fileID, err) + } + + req := &ReadRequest{ + VolumeID: volumeID, + NeedleID: needleID, + Cookie: cookie, + Offset: offset, + Size: size, + } + return c.Read(ctx, req) +} + +// parseFileID extracts volume ID, needle ID, and cookie from a SeaweedFS file ID +// Uses existing SeaweedFS parsing logic to ensure compatibility +func parseFileID(fileId string) (volumeID uint32, needleID uint64, cookie uint32, err error) { + // Use existing SeaweedFS file ID parsing + fid, err := needle.ParseFileIdFromString(fileId) + if err != nil { + return 0, 0, 0, fmt.Errorf("failed to parse file ID %s: %w", fileId, err) + } + + volumeID = uint32(fid.VolumeId) + needleID = uint64(fid.Key) + cookie = uint32(fid.Cookie) + + return volumeID, needleID, cookie, nil +} + +// ReadFull performs an RDMA read for an entire needle +func (c *Client) ReadFull(ctx context.Context, volumeID uint32, needleID uint64, cookie uint32) (*ReadResponse, error) { + req := &ReadRequest{ + VolumeID: volumeID, + NeedleID: needleID, + Cookie: cookie, + Offset: 0, + Size: 0, // 0 means read entire needle + } + return c.Read(ctx, req) +} + +// Ping tests connectivity to the RDMA engine +func (c *Client) Ping(ctx context.Context) (time.Duration, error) { + if !c.IsConnected() { + return 0, fmt.Errorf("not connected to RDMA engine") + } + + clientID := "health-check" + start := time.Now() + pong, err := c.ipcClient.Ping(ctx, &clientID) + if err != nil { + return 0, err + } + + totalLatency := time.Since(start) + serverRtt := time.Duration(pong.ServerRttNs) + + c.logger.WithFields(logrus.Fields{ + "total_latency": totalLatency, + "server_rtt": serverRtt, + "client_id": clientID, + }).Debug("๐ RDMA engine ping successful") + + return totalLatency, nil +} + +// readWithPool performs RDMA read using connection pooling +func (c *Client) readWithPool(ctx context.Context, req *ReadRequest, startTime time.Time) (*ReadResponse, error) { + // Get connection from pool + conn, err := c.pool.getConnection(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get pooled connection: %w", err) + } + defer c.pool.releaseConnection(conn) + + c.logger.WithField("session_id", conn.sessionID).Debug("๐ Using pooled RDMA connection") + + // Create IPC request + ipcReq := &ipc.StartReadRequest{ + VolumeID: req.VolumeID, + NeedleID: req.NeedleID, + Cookie: req.Cookie, + Offset: req.Offset, + Size: req.Size, + RemoteAddr: 0, // Will be set by engine (mock for now) + RemoteKey: 0, // Will be set by engine (mock for now) + TimeoutSecs: uint64(c.defaultTimeout.Seconds()), + AuthToken: req.AuthToken, + } + + // Start RDMA read + startResp, err := conn.ipcClient.StartRead(ctx, ipcReq) + if err != nil { + c.logger.WithError(err).Error("โ Failed to start RDMA read (pooled)") + return nil, fmt.Errorf("failed to start RDMA read: %w", err) + } + + c.logger.WithFields(logrus.Fields{ + "session_id": startResp.SessionID, + "local_addr": fmt.Sprintf("0x%x", startResp.LocalAddr), + "local_key": startResp.LocalKey, + "transfer_size": startResp.TransferSize, + "expected_crc": fmt.Sprintf("0x%x", startResp.ExpectedCrc), + "expires_at": time.Unix(0, int64(startResp.ExpiresAtNs)).Format(time.RFC3339), + "pooled": true, + }).Debug("๐ RDMA read session started (pooled)") + + // Complete the RDMA read + completeResp, err := conn.ipcClient.CompleteRead(ctx, startResp.SessionID, true, startResp.TransferSize, &startResp.ExpectedCrc) + if err != nil { + c.logger.WithError(err).Error("โ Failed to complete RDMA read (pooled)") + return nil, fmt.Errorf("failed to complete RDMA read: %w", err) + } + + duration := time.Since(startTime) + + if !completeResp.Success { + errorMsg := "unknown error" + if completeResp.Message != nil { + errorMsg = *completeResp.Message + } + c.logger.WithFields(logrus.Fields{ + "session_id": conn.sessionID, + "error_message": errorMsg, + "pooled": true, + }).Error("โ RDMA read completion failed (pooled)") + return nil, fmt.Errorf("RDMA read completion failed: %s", errorMsg) + } + + // Calculate transfer rate (bytes/second) + transferRate := float64(startResp.TransferSize) / duration.Seconds() + + c.logger.WithFields(logrus.Fields{ + "session_id": conn.sessionID, + "bytes_read": startResp.TransferSize, + "duration": duration, + "transfer_rate": transferRate, + "server_crc": completeResp.ServerCrc, + "pooled": true, + }).Info("โ
RDMA read completed successfully (pooled)") + + // For the mock implementation, we'll return placeholder data + // In the real implementation, this would be the actual RDMA transferred data + mockData := make([]byte, startResp.TransferSize) + for i := range mockData { + mockData[i] = byte(i % 256) // Simple pattern for testing + } + + return &ReadResponse{ + Data: mockData, + BytesRead: startResp.TransferSize, + Duration: duration, + TransferRate: transferRate, + SessionID: conn.sessionID, + Success: true, + Message: "RDMA read successful (pooled)", + }, nil +} |
