aboutsummaryrefslogtreecommitdiff
path: root/seaweedfs-rdma-sidecar/cmd
diff options
context:
space:
mode:
Diffstat (limited to 'seaweedfs-rdma-sidecar/cmd')
-rw-r--r--seaweedfs-rdma-sidecar/cmd/demo-server/main.go663
-rw-r--r--seaweedfs-rdma-sidecar/cmd/sidecar/main.go345
-rw-r--r--seaweedfs-rdma-sidecar/cmd/test-rdma/main.go295
3 files changed, 1303 insertions, 0 deletions
diff --git a/seaweedfs-rdma-sidecar/cmd/demo-server/main.go b/seaweedfs-rdma-sidecar/cmd/demo-server/main.go
new file mode 100644
index 000000000..42b5020e5
--- /dev/null
+++ b/seaweedfs-rdma-sidecar/cmd/demo-server/main.go
@@ -0,0 +1,663 @@
+// Package main provides a demonstration server showing SeaweedFS RDMA integration
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "os"
+ "os/signal"
+ "strconv"
+ "strings"
+ "syscall"
+ "time"
+
+ "seaweedfs-rdma-sidecar/pkg/seaweedfs"
+
+ "github.com/seaweedfs/seaweedfs/weed/storage/needle"
+ "github.com/sirupsen/logrus"
+ "github.com/spf13/cobra"
+)
+
+var (
+ port int
+ rdmaSocket string
+ volumeServerURL string
+ enableRDMA bool
+ enableZeroCopy bool
+ tempDir string
+ enablePooling bool
+ maxConnections int
+ maxIdleTime time.Duration
+ debug bool
+)
+
+func main() {
+ var rootCmd = &cobra.Command{
+ Use: "demo-server",
+ Short: "SeaweedFS RDMA integration demonstration server",
+ Long: `Demonstration server that shows how SeaweedFS can integrate with the RDMA sidecar
+for accelerated read operations. This server provides HTTP endpoints that demonstrate
+the RDMA fast path with HTTP fallback capabilities.`,
+ RunE: runServer,
+ }
+
+ rootCmd.Flags().IntVarP(&port, "port", "p", 8080, "Demo server HTTP port")
+ rootCmd.Flags().StringVarP(&rdmaSocket, "rdma-socket", "r", "/tmp/rdma-engine.sock", "Path to RDMA engine Unix socket")
+ rootCmd.Flags().StringVarP(&volumeServerURL, "volume-server", "v", "http://localhost:8080", "SeaweedFS volume server URL for HTTP fallback")
+ rootCmd.Flags().BoolVarP(&enableRDMA, "enable-rdma", "e", true, "Enable RDMA acceleration")
+ rootCmd.Flags().BoolVarP(&enableZeroCopy, "enable-zerocopy", "z", true, "Enable zero-copy optimization via temp files")
+ rootCmd.Flags().StringVarP(&tempDir, "temp-dir", "t", "/tmp/rdma-cache", "Temp directory for zero-copy files")
+ rootCmd.Flags().BoolVar(&enablePooling, "enable-pooling", true, "Enable RDMA connection pooling")
+ rootCmd.Flags().IntVar(&maxConnections, "max-connections", 10, "Maximum connections in RDMA pool")
+ rootCmd.Flags().DurationVar(&maxIdleTime, "max-idle-time", 5*time.Minute, "Maximum idle time for pooled connections")
+ rootCmd.Flags().BoolVarP(&debug, "debug", "d", false, "Enable debug logging")
+
+ if err := rootCmd.Execute(); err != nil {
+ fmt.Fprintf(os.Stderr, "Error: %v\n", err)
+ os.Exit(1)
+ }
+}
+
+func runServer(cmd *cobra.Command, args []string) error {
+ // Setup logging
+ logger := logrus.New()
+ if debug {
+ logger.SetLevel(logrus.DebugLevel)
+ logger.SetFormatter(&logrus.TextFormatter{
+ FullTimestamp: true,
+ ForceColors: true,
+ })
+ } else {
+ logger.SetLevel(logrus.InfoLevel)
+ }
+
+ logger.WithFields(logrus.Fields{
+ "port": port,
+ "rdma_socket": rdmaSocket,
+ "volume_server_url": volumeServerURL,
+ "enable_rdma": enableRDMA,
+ "enable_zerocopy": enableZeroCopy,
+ "temp_dir": tempDir,
+ "enable_pooling": enablePooling,
+ "max_connections": maxConnections,
+ "max_idle_time": maxIdleTime,
+ "debug": debug,
+ }).Info("๐Ÿš€ Starting SeaweedFS RDMA Demo Server")
+
+ // Create SeaweedFS RDMA client
+ config := &seaweedfs.Config{
+ RDMASocketPath: rdmaSocket,
+ VolumeServerURL: volumeServerURL,
+ Enabled: enableRDMA,
+ DefaultTimeout: 30 * time.Second,
+ Logger: logger,
+ TempDir: tempDir,
+ UseZeroCopy: enableZeroCopy,
+ EnablePooling: enablePooling,
+ MaxConnections: maxConnections,
+ MaxIdleTime: maxIdleTime,
+ }
+
+ rdmaClient, err := seaweedfs.NewSeaweedFSRDMAClient(config)
+ if err != nil {
+ return fmt.Errorf("failed to create RDMA client: %w", err)
+ }
+
+ // Start RDMA client
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ if err := rdmaClient.Start(ctx); err != nil {
+ logger.WithError(err).Error("Failed to start RDMA client")
+ }
+ cancel()
+
+ // Create demo server
+ server := &DemoServer{
+ rdmaClient: rdmaClient,
+ logger: logger,
+ }
+
+ // Setup HTTP routes
+ mux := http.NewServeMux()
+ mux.HandleFunc("/", server.homeHandler)
+ mux.HandleFunc("/health", server.healthHandler)
+ mux.HandleFunc("/stats", server.statsHandler)
+ mux.HandleFunc("/read", server.readHandler)
+ mux.HandleFunc("/benchmark", server.benchmarkHandler)
+ mux.HandleFunc("/cleanup", server.cleanupHandler)
+
+ httpServer := &http.Server{
+ Addr: fmt.Sprintf(":%d", port),
+ Handler: mux,
+ }
+
+ // Handle graceful shutdown
+ sigChan := make(chan os.Signal, 1)
+ signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
+
+ go func() {
+ logger.WithField("port", port).Info("๐ŸŒ Demo server starting")
+ if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
+ logger.WithError(err).Fatal("HTTP server failed")
+ }
+ }()
+
+ // Wait for shutdown signal
+ <-sigChan
+ logger.Info("๐Ÿ“ก Received shutdown signal, gracefully shutting down...")
+
+ // Shutdown HTTP server
+ shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer shutdownCancel()
+
+ if err := httpServer.Shutdown(shutdownCtx); err != nil {
+ logger.WithError(err).Error("HTTP server shutdown failed")
+ } else {
+ logger.Info("๐ŸŒ HTTP server shutdown complete")
+ }
+
+ // Stop RDMA client
+ rdmaClient.Stop()
+ logger.Info("๐Ÿ›‘ Demo server shutdown complete")
+
+ return nil
+}
+
+// DemoServer demonstrates SeaweedFS RDMA integration
+type DemoServer struct {
+ rdmaClient *seaweedfs.SeaweedFSRDMAClient
+ logger *logrus.Logger
+}
+
+// homeHandler provides information about the demo server
+func (s *DemoServer) homeHandler(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ w.Header().Set("Content-Type", "text/html")
+ fmt.Fprintf(w, `<!DOCTYPE html>
+<html>
+<head>
+ <title>SeaweedFS RDMA Demo Server</title>
+ <style>
+ body { font-family: Arial, sans-serif; margin: 40px; background-color: #f5f5f5; }
+ .container { max-width: 800px; margin: 0 auto; background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
+ h1 { color: #2c3e50; }
+ .endpoint { margin: 20px 0; padding: 15px; background: #ecf0f1; border-radius: 4px; }
+ .endpoint h3 { margin: 0 0 10px 0; color: #34495e; }
+ .endpoint a { color: #3498db; text-decoration: none; }
+ .endpoint a:hover { text-decoration: underline; }
+ .status { padding: 10px; border-radius: 4px; margin: 10px 0; }
+ .status.enabled { background: #d5f4e6; color: #27ae60; }
+ .status.disabled { background: #fadbd8; color: #e74c3c; }
+ </style>
+</head>
+<body>
+ <div class="container">
+ <h1>๐Ÿš€ SeaweedFS RDMA Demo Server</h1>
+ <p>This server demonstrates SeaweedFS integration with RDMA acceleration for high-performance reads.</p>
+
+ <div class="status %s">
+ <strong>RDMA Status:</strong> %s
+ </div>
+
+ <h2>๐Ÿ“‹ Available Endpoints</h2>
+
+ <div class="endpoint">
+ <h3>๐Ÿฅ Health Check</h3>
+ <p><a href="/health">/health</a> - Check server and RDMA engine health</p>
+ </div>
+
+ <div class="endpoint">
+ <h3>๐Ÿ“Š Statistics</h3>
+ <p><a href="/stats">/stats</a> - Get RDMA client statistics and capabilities</p>
+ </div>
+
+ <div class="endpoint">
+ <h3>๐Ÿ“– Read Needle</h3>
+ <p><a href="/read?file_id=3,01637037d6&size=1024&volume_server=http://localhost:8080">/read</a> - Read a needle with RDMA fast path</p>
+ <p><strong>Parameters:</strong> file_id OR (volume, needle, cookie), volume_server, offset (optional), size (optional)</p>
+ </div>
+
+ <div class="endpoint">
+ <h3>๐Ÿ Benchmark</h3>
+ <p><a href="/benchmark?iterations=10&size=4096">/benchmark</a> - Run performance benchmark</p>
+ <p><strong>Parameters:</strong> iterations (default: 10), size (default: 4096)</p>
+ </div>
+
+ <h2>๐Ÿ“ Example Usage</h2>
+ <pre>
+# Read a needle using file ID (recommended)
+curl "http://localhost:%d/read?file_id=3,01637037d6&size=1024&volume_server=http://localhost:8080"
+
+# Read a needle using individual parameters (legacy)
+curl "http://localhost:%d/read?volume=1&needle=12345&cookie=305419896&size=1024&volume_server=http://localhost:8080"
+
+# Read a needle (hex cookie)
+curl "http://localhost:%d/read?volume=1&needle=12345&cookie=0x12345678&size=1024&volume_server=http://localhost:8080"
+
+# Run benchmark
+curl "http://localhost:%d/benchmark?iterations=5&size=2048"
+
+# Check health
+curl "http://localhost:%d/health"
+ </pre>
+ </div>
+</body>
+</html>`,
+ map[bool]string{true: "enabled", false: "disabled"}[s.rdmaClient.IsEnabled()],
+ map[bool]string{true: "RDMA Enabled โœ…", false: "RDMA Disabled (HTTP Fallback Only) โš ๏ธ"}[s.rdmaClient.IsEnabled()],
+ port, port, port, port)
+}
+
+// healthHandler checks server and RDMA health
+func (s *DemoServer) healthHandler(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
+ defer cancel()
+
+ health := map[string]interface{}{
+ "status": "healthy",
+ "timestamp": time.Now().Format(time.RFC3339),
+ "rdma": map[string]interface{}{
+ "enabled": false,
+ "connected": false,
+ },
+ }
+
+ if s.rdmaClient != nil {
+ health["rdma"].(map[string]interface{})["enabled"] = s.rdmaClient.IsEnabled()
+ health["rdma"].(map[string]interface{})["type"] = "local"
+
+ if s.rdmaClient.IsEnabled() {
+ if err := s.rdmaClient.HealthCheck(ctx); err != nil {
+ s.logger.WithError(err).Warn("RDMA health check failed")
+ health["rdma"].(map[string]interface{})["error"] = err.Error()
+ } else {
+ health["rdma"].(map[string]interface{})["connected"] = true
+ }
+ }
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(health)
+}
+
+// statsHandler returns RDMA statistics
+func (s *DemoServer) statsHandler(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ var stats map[string]interface{}
+
+ if s.rdmaClient != nil {
+ stats = s.rdmaClient.GetStats()
+ stats["client_type"] = "local"
+ } else {
+ stats = map[string]interface{}{
+ "client_type": "none",
+ "error": "no RDMA client available",
+ }
+ }
+
+ stats["timestamp"] = time.Now().Format(time.RFC3339)
+
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(stats)
+}
+
+// readHandler demonstrates needle reading with RDMA
+func (s *DemoServer) readHandler(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ // Parse parameters - support both file_id and individual parameters for backward compatibility
+ query := r.URL.Query()
+ volumeServer := query.Get("volume_server")
+ fileID := query.Get("file_id")
+
+ var volumeID, cookie uint64
+ var needleID uint64
+ var err error
+
+ if fileID != "" {
+ // Use file ID format (e.g., "3,01637037d6")
+ // Extract individual components using existing SeaweedFS parsing
+ fid, parseErr := needle.ParseFileIdFromString(fileID)
+ if parseErr != nil {
+ http.Error(w, fmt.Sprintf("invalid 'file_id' parameter: %v", parseErr), http.StatusBadRequest)
+ return
+ }
+ volumeID = uint64(fid.VolumeId)
+ needleID = uint64(fid.Key)
+ cookie = uint64(fid.Cookie)
+ } else {
+ // Use individual parameters (backward compatibility)
+ volumeID, err = strconv.ParseUint(query.Get("volume"), 10, 32)
+ if err != nil {
+ http.Error(w, "invalid 'volume' parameter", http.StatusBadRequest)
+ return
+ }
+
+ needleID, err = strconv.ParseUint(query.Get("needle"), 10, 64)
+ if err != nil {
+ http.Error(w, "invalid 'needle' parameter", http.StatusBadRequest)
+ return
+ }
+
+ // Parse cookie parameter - support both decimal and hexadecimal formats
+ cookieStr := query.Get("cookie")
+ if strings.HasPrefix(strings.ToLower(cookieStr), "0x") {
+ // Parse as hexadecimal (remove "0x" prefix)
+ cookie, err = strconv.ParseUint(cookieStr[2:], 16, 32)
+ } else {
+ // Parse as decimal (default)
+ cookie, err = strconv.ParseUint(cookieStr, 10, 32)
+ }
+ if err != nil {
+ http.Error(w, "invalid 'cookie' parameter (expected decimal or hex with 0x prefix)", http.StatusBadRequest)
+ return
+ }
+ }
+
+ var offset uint64
+ if offsetStr := query.Get("offset"); offsetStr != "" {
+ var parseErr error
+ offset, parseErr = strconv.ParseUint(offsetStr, 10, 64)
+ if parseErr != nil {
+ http.Error(w, "invalid 'offset' parameter", http.StatusBadRequest)
+ return
+ }
+ }
+
+ var size uint64
+ if sizeStr := query.Get("size"); sizeStr != "" {
+ var parseErr error
+ size, parseErr = strconv.ParseUint(sizeStr, 10, 64)
+ if parseErr != nil {
+ http.Error(w, "invalid 'size' parameter", http.StatusBadRequest)
+ return
+ }
+ }
+
+ if volumeServer == "" {
+ http.Error(w, "volume_server parameter is required", http.StatusBadRequest)
+ return
+ }
+
+ if volumeID == 0 || needleID == 0 {
+ http.Error(w, "volume and needle parameters are required", http.StatusBadRequest)
+ return
+ }
+
+ // Note: cookie and size can have defaults for demo purposes when user provides empty values,
+ // but invalid parsing is caught above with proper error responses
+ if cookie == 0 {
+ cookie = 0x12345678 // Default cookie for demo
+ }
+
+ if size == 0 {
+ size = 4096 // Default size
+ }
+
+ logFields := logrus.Fields{
+ "volume_server": volumeServer,
+ "volume_id": volumeID,
+ "needle_id": needleID,
+ "cookie": fmt.Sprintf("0x%x", cookie),
+ "offset": offset,
+ "size": size,
+ }
+ if fileID != "" {
+ logFields["file_id"] = fileID
+ }
+ s.logger.WithFields(logFields).Info("๐Ÿ“– Processing needle read request")
+
+ ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
+ defer cancel()
+
+ start := time.Now()
+ req := &seaweedfs.NeedleReadRequest{
+ VolumeID: uint32(volumeID),
+ NeedleID: needleID,
+ Cookie: uint32(cookie),
+ Offset: offset,
+ Size: size,
+ VolumeServer: volumeServer,
+ }
+
+ resp, err := s.rdmaClient.ReadNeedle(ctx, req)
+
+ if err != nil {
+ s.logger.WithError(err).Error("โŒ Needle read failed")
+ http.Error(w, fmt.Sprintf("Read failed: %v", err), http.StatusInternalServerError)
+ return
+ }
+
+ duration := time.Since(start)
+
+ s.logger.WithFields(logrus.Fields{
+ "volume_id": volumeID,
+ "needle_id": needleID,
+ "is_rdma": resp.IsRDMA,
+ "source": resp.Source,
+ "duration": duration,
+ "data_size": len(resp.Data),
+ }).Info("โœ… Needle read completed")
+
+ // Return metadata and first few bytes
+ result := map[string]interface{}{
+ "success": true,
+ "volume_id": volumeID,
+ "needle_id": needleID,
+ "cookie": fmt.Sprintf("0x%x", cookie),
+ "is_rdma": resp.IsRDMA,
+ "source": resp.Source,
+ "session_id": resp.SessionID,
+ "duration": duration.String(),
+ "data_size": len(resp.Data),
+ "timestamp": time.Now().Format(time.RFC3339),
+ "use_temp_file": resp.UseTempFile,
+ "temp_file": resp.TempFilePath,
+ }
+
+ // Set headers for zero-copy optimization
+ if resp.UseTempFile && resp.TempFilePath != "" {
+ w.Header().Set("X-Use-Temp-File", "true")
+ w.Header().Set("X-Temp-File", resp.TempFilePath)
+ w.Header().Set("X-Source", resp.Source)
+ w.Header().Set("X-RDMA-Used", fmt.Sprintf("%t", resp.IsRDMA))
+
+ // For zero-copy, return minimal JSON response and let client read from temp file
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(result)
+ return
+ }
+
+ // Regular response with data
+ w.Header().Set("X-Source", resp.Source)
+ w.Header().Set("X-RDMA-Used", fmt.Sprintf("%t", resp.IsRDMA))
+
+ // Include first 32 bytes as hex for verification
+ if len(resp.Data) > 0 {
+ displayLen := 32
+ if len(resp.Data) < displayLen {
+ displayLen = len(resp.Data)
+ }
+ result["data_preview"] = fmt.Sprintf("%x", resp.Data[:displayLen])
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(result)
+}
+
+// benchmarkHandler runs performance benchmarks
+func (s *DemoServer) benchmarkHandler(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ // Parse parameters
+ query := r.URL.Query()
+
+ iterations := 10 // default value
+ if iterationsStr := query.Get("iterations"); iterationsStr != "" {
+ var parseErr error
+ iterations, parseErr = strconv.Atoi(iterationsStr)
+ if parseErr != nil {
+ http.Error(w, "invalid 'iterations' parameter", http.StatusBadRequest)
+ return
+ }
+ }
+
+ size := uint64(4096) // default value
+ if sizeStr := query.Get("size"); sizeStr != "" {
+ var parseErr error
+ size, parseErr = strconv.ParseUint(sizeStr, 10, 64)
+ if parseErr != nil {
+ http.Error(w, "invalid 'size' parameter", http.StatusBadRequest)
+ return
+ }
+ }
+
+ if iterations <= 0 {
+ iterations = 10
+ }
+ if size == 0 {
+ size = 4096
+ }
+
+ s.logger.WithFields(logrus.Fields{
+ "iterations": iterations,
+ "size": size,
+ }).Info("๐Ÿ Starting benchmark")
+
+ ctx, cancel := context.WithTimeout(r.Context(), 60*time.Second)
+ defer cancel()
+
+ var rdmaSuccessful, rdmaFailed, httpSuccessful, httpFailed int
+ var totalDuration time.Duration
+ var totalBytes uint64
+
+ startTime := time.Now()
+
+ for i := 0; i < iterations; i++ {
+ req := &seaweedfs.NeedleReadRequest{
+ VolumeID: 1,
+ NeedleID: uint64(i + 1),
+ Cookie: 0x12345678,
+ Offset: 0,
+ Size: size,
+ }
+
+ opStart := time.Now()
+ resp, err := s.rdmaClient.ReadNeedle(ctx, req)
+ opDuration := time.Since(opStart)
+
+ if err != nil {
+ httpFailed++
+ continue
+ }
+
+ totalDuration += opDuration
+ totalBytes += uint64(len(resp.Data))
+
+ if resp.IsRDMA {
+ rdmaSuccessful++
+ } else {
+ httpSuccessful++
+ }
+ }
+
+ benchDuration := time.Since(startTime)
+
+ // Calculate statistics
+ totalOperations := rdmaSuccessful + httpSuccessful
+ avgLatency := time.Duration(0)
+ if totalOperations > 0 {
+ avgLatency = totalDuration / time.Duration(totalOperations)
+ }
+
+ throughputMBps := float64(totalBytes) / benchDuration.Seconds() / (1024 * 1024)
+ opsPerSec := float64(totalOperations) / benchDuration.Seconds()
+
+ result := map[string]interface{}{
+ "benchmark_results": map[string]interface{}{
+ "iterations": iterations,
+ "size_per_op": size,
+ "total_duration": benchDuration.String(),
+ "successful_ops": totalOperations,
+ "failed_ops": rdmaFailed + httpFailed,
+ "rdma_ops": rdmaSuccessful,
+ "http_ops": httpSuccessful,
+ "avg_latency": avgLatency.String(),
+ "throughput_mbps": fmt.Sprintf("%.2f", throughputMBps),
+ "ops_per_sec": fmt.Sprintf("%.1f", opsPerSec),
+ "total_bytes": totalBytes,
+ },
+ "rdma_enabled": s.rdmaClient.IsEnabled(),
+ "timestamp": time.Now().Format(time.RFC3339),
+ }
+
+ s.logger.WithFields(logrus.Fields{
+ "iterations": iterations,
+ "successful_ops": totalOperations,
+ "rdma_ops": rdmaSuccessful,
+ "http_ops": httpSuccessful,
+ "avg_latency": avgLatency,
+ "throughput_mbps": throughputMBps,
+ "ops_per_sec": opsPerSec,
+ }).Info("๐Ÿ“Š Benchmark completed")
+
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(result)
+}
+
+// cleanupHandler handles temp file cleanup requests from mount clients
+func (s *DemoServer) cleanupHandler(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodDelete {
+ http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ // Get temp file path from query parameters
+ tempFilePath := r.URL.Query().Get("temp_file")
+ if tempFilePath == "" {
+ http.Error(w, "missing 'temp_file' parameter", http.StatusBadRequest)
+ return
+ }
+
+ s.logger.WithField("temp_file", tempFilePath).Debug("๐Ÿ—‘๏ธ Processing cleanup request")
+
+ // Use the RDMA client's cleanup method (which delegates to seaweedfs client)
+ err := s.rdmaClient.CleanupTempFile(tempFilePath)
+ if err != nil {
+ s.logger.WithError(err).WithField("temp_file", tempFilePath).Warn("Failed to cleanup temp file")
+ http.Error(w, fmt.Sprintf("cleanup failed: %v", err), http.StatusInternalServerError)
+ return
+ }
+
+ s.logger.WithField("temp_file", tempFilePath).Debug("๐Ÿงน Temp file cleanup successful")
+
+ // Return success response
+ w.Header().Set("Content-Type", "application/json")
+ response := map[string]interface{}{
+ "success": true,
+ "message": "temp file cleaned up successfully",
+ "temp_file": tempFilePath,
+ "timestamp": time.Now().Format(time.RFC3339),
+ }
+ json.NewEncoder(w).Encode(response)
+}
diff --git a/seaweedfs-rdma-sidecar/cmd/sidecar/main.go b/seaweedfs-rdma-sidecar/cmd/sidecar/main.go
new file mode 100644
index 000000000..55d98c4c6
--- /dev/null
+++ b/seaweedfs-rdma-sidecar/cmd/sidecar/main.go
@@ -0,0 +1,345 @@
+// Package main provides the main RDMA sidecar service that integrates with SeaweedFS
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "os"
+ "os/signal"
+ "strconv"
+ "syscall"
+ "time"
+
+ "seaweedfs-rdma-sidecar/pkg/rdma"
+
+ "github.com/sirupsen/logrus"
+ "github.com/spf13/cobra"
+)
+
+var (
+ port int
+ engineSocket string
+ debug bool
+ timeout time.Duration
+)
+
+// Response structs for JSON encoding
+type HealthResponse struct {
+ Status string `json:"status"`
+ RdmaEngineConnected bool `json:"rdma_engine_connected"`
+ RdmaEngineLatency string `json:"rdma_engine_latency"`
+ Timestamp string `json:"timestamp"`
+}
+
+type CapabilitiesResponse struct {
+ Version string `json:"version"`
+ DeviceName string `json:"device_name"`
+ VendorId uint32 `json:"vendor_id"`
+ MaxSessions uint32 `json:"max_sessions"`
+ MaxTransferSize uint64 `json:"max_transfer_size"`
+ ActiveSessions uint32 `json:"active_sessions"`
+ RealRdma bool `json:"real_rdma"`
+ PortGid string `json:"port_gid"`
+ PortLid uint16 `json:"port_lid"`
+ SupportedAuth []string `json:"supported_auth"`
+}
+
+type PingResponse struct {
+ Success bool `json:"success"`
+ EngineLatency string `json:"engine_latency"`
+ TotalLatency string `json:"total_latency"`
+ Timestamp string `json:"timestamp"`
+}
+
+func main() {
+ var rootCmd = &cobra.Command{
+ Use: "rdma-sidecar",
+ Short: "SeaweedFS RDMA acceleration sidecar",
+ Long: `RDMA sidecar that accelerates SeaweedFS read/write operations using UCX and Rust RDMA engine.
+
+This sidecar acts as a bridge between SeaweedFS volume servers and the high-performance
+Rust RDMA engine, providing significant performance improvements for data-intensive workloads.`,
+ RunE: runSidecar,
+ }
+
+ // Flags
+ rootCmd.Flags().IntVarP(&port, "port", "p", 8081, "HTTP server port")
+ rootCmd.Flags().StringVarP(&engineSocket, "engine-socket", "e", "/tmp/rdma-engine.sock", "Path to RDMA engine Unix socket")
+ rootCmd.Flags().BoolVarP(&debug, "debug", "d", false, "Enable debug logging")
+ rootCmd.Flags().DurationVarP(&timeout, "timeout", "t", 30*time.Second, "RDMA operation timeout")
+
+ if err := rootCmd.Execute(); err != nil {
+ fmt.Fprintf(os.Stderr, "Error: %v\n", err)
+ os.Exit(1)
+ }
+}
+
+func runSidecar(cmd *cobra.Command, args []string) error {
+ // Setup logging
+ logger := logrus.New()
+ if debug {
+ logger.SetLevel(logrus.DebugLevel)
+ logger.SetFormatter(&logrus.TextFormatter{
+ FullTimestamp: true,
+ ForceColors: true,
+ })
+ } else {
+ logger.SetLevel(logrus.InfoLevel)
+ }
+
+ logger.WithFields(logrus.Fields{
+ "port": port,
+ "engine_socket": engineSocket,
+ "debug": debug,
+ "timeout": timeout,
+ }).Info("๐Ÿš€ Starting SeaweedFS RDMA Sidecar")
+
+ // Create RDMA client
+ rdmaConfig := &rdma.Config{
+ EngineSocketPath: engineSocket,
+ DefaultTimeout: timeout,
+ Logger: logger,
+ }
+
+ rdmaClient := rdma.NewClient(rdmaConfig)
+
+ // Connect to RDMA engine
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ logger.Info("๐Ÿ”— Connecting to RDMA engine...")
+ if err := rdmaClient.Connect(ctx); err != nil {
+ return fmt.Errorf("failed to connect to RDMA engine: %w", err)
+ }
+ logger.Info("โœ… Connected to RDMA engine successfully")
+
+ // Create HTTP server
+ sidecar := &Sidecar{
+ rdmaClient: rdmaClient,
+ logger: logger,
+ }
+
+ mux := http.NewServeMux()
+
+ // Health check endpoint
+ mux.HandleFunc("/health", sidecar.healthHandler)
+
+ // RDMA operations endpoints
+ mux.HandleFunc("/rdma/read", sidecar.rdmaReadHandler)
+ mux.HandleFunc("/rdma/capabilities", sidecar.capabilitiesHandler)
+ mux.HandleFunc("/rdma/ping", sidecar.pingHandler)
+
+ server := &http.Server{
+ Addr: fmt.Sprintf(":%d", port),
+ Handler: mux,
+ }
+
+ // Handle graceful shutdown
+ sigChan := make(chan os.Signal, 1)
+ signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
+
+ go func() {
+ logger.WithField("port", port).Info("๐ŸŒ HTTP server starting")
+ if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
+ logger.WithError(err).Fatal("HTTP server failed")
+ }
+ }()
+
+ // Wait for shutdown signal
+ <-sigChan
+ logger.Info("๐Ÿ“ก Received shutdown signal, gracefully shutting down...")
+
+ // Shutdown HTTP server
+ shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer shutdownCancel()
+
+ if err := server.Shutdown(shutdownCtx); err != nil {
+ logger.WithError(err).Error("HTTP server shutdown failed")
+ } else {
+ logger.Info("๐ŸŒ HTTP server shutdown complete")
+ }
+
+ // Disconnect from RDMA engine
+ rdmaClient.Disconnect()
+ logger.Info("๐Ÿ›‘ RDMA sidecar shutdown complete")
+
+ return nil
+}
+
+// Sidecar represents the main sidecar service
+type Sidecar struct {
+ rdmaClient *rdma.Client
+ logger *logrus.Logger
+}
+
+// Health check handler
+func (s *Sidecar) healthHandler(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
+ defer cancel()
+
+ // Test RDMA engine connectivity
+ if !s.rdmaClient.IsConnected() {
+ s.logger.Warn("โš ๏ธ RDMA engine not connected")
+ http.Error(w, "RDMA engine not connected", http.StatusServiceUnavailable)
+ return
+ }
+
+ // Ping RDMA engine
+ latency, err := s.rdmaClient.Ping(ctx)
+ if err != nil {
+ s.logger.WithError(err).Error("โŒ RDMA engine ping failed")
+ http.Error(w, "RDMA engine ping failed", http.StatusServiceUnavailable)
+ return
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ response := HealthResponse{
+ Status: "healthy",
+ RdmaEngineConnected: true,
+ RdmaEngineLatency: latency.String(),
+ Timestamp: time.Now().Format(time.RFC3339),
+ }
+ json.NewEncoder(w).Encode(response)
+}
+
+// RDMA capabilities handler
+func (s *Sidecar) capabilitiesHandler(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ caps := s.rdmaClient.GetCapabilities()
+ if caps == nil {
+ http.Error(w, "No capabilities available", http.StatusServiceUnavailable)
+ return
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ response := CapabilitiesResponse{
+ Version: caps.Version,
+ DeviceName: caps.DeviceName,
+ VendorId: caps.VendorId,
+ MaxSessions: uint32(caps.MaxSessions),
+ MaxTransferSize: caps.MaxTransferSize,
+ ActiveSessions: uint32(caps.ActiveSessions),
+ RealRdma: caps.RealRdma,
+ PortGid: caps.PortGid,
+ PortLid: caps.PortLid,
+ SupportedAuth: caps.SupportedAuth,
+ }
+ json.NewEncoder(w).Encode(response)
+}
+
+// RDMA ping handler
+func (s *Sidecar) pingHandler(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
+ defer cancel()
+
+ start := time.Now()
+ latency, err := s.rdmaClient.Ping(ctx)
+ totalLatency := time.Since(start)
+
+ if err != nil {
+ s.logger.WithError(err).Error("โŒ RDMA ping failed")
+ http.Error(w, fmt.Sprintf("Ping failed: %v", err), http.StatusInternalServerError)
+ return
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ response := PingResponse{
+ Success: true,
+ EngineLatency: latency.String(),
+ TotalLatency: totalLatency.String(),
+ Timestamp: time.Now().Format(time.RFC3339),
+ }
+ json.NewEncoder(w).Encode(response)
+}
+
+// RDMA read handler - uses GET method with query parameters for RESTful read operations
+func (s *Sidecar) rdmaReadHandler(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ // Parse query parameters
+ query := r.URL.Query()
+
+ // Get file ID (e.g., "3,01637037d6") - this is the natural SeaweedFS identifier
+ fileID := query.Get("file_id")
+ if fileID == "" {
+ http.Error(w, "missing 'file_id' parameter", http.StatusBadRequest)
+ return
+ }
+
+ // Parse optional offset and size parameters
+ offset := uint64(0) // default value
+ if offsetStr := query.Get("offset"); offsetStr != "" {
+ val, err := strconv.ParseUint(offsetStr, 10, 64)
+ if err != nil {
+ http.Error(w, "invalid 'offset' parameter", http.StatusBadRequest)
+ return
+ }
+ offset = val
+ }
+
+ size := uint64(4096) // default value
+ if sizeStr := query.Get("size"); sizeStr != "" {
+ val, err := strconv.ParseUint(sizeStr, 10, 64)
+ if err != nil {
+ http.Error(w, "invalid 'size' parameter", http.StatusBadRequest)
+ return
+ }
+ size = val
+ }
+
+ s.logger.WithFields(logrus.Fields{
+ "file_id": fileID,
+ "offset": offset,
+ "size": size,
+ }).Info("๐Ÿ“– Processing RDMA read request")
+
+ ctx, cancel := context.WithTimeout(r.Context(), timeout)
+ defer cancel()
+
+ start := time.Now()
+ resp, err := s.rdmaClient.ReadFileRange(ctx, fileID, offset, size)
+ duration := time.Since(start)
+
+ if err != nil {
+ s.logger.WithError(err).Error("โŒ RDMA read failed")
+ http.Error(w, fmt.Sprintf("RDMA read failed: %v", err), http.StatusInternalServerError)
+ return
+ }
+
+ s.logger.WithFields(logrus.Fields{
+ "file_id": fileID,
+ "bytes_read": resp.BytesRead,
+ "duration": duration,
+ "transfer_rate": resp.TransferRate,
+ "session_id": resp.SessionID,
+ }).Info("โœ… RDMA read completed successfully")
+
+ // Set response headers
+ w.Header().Set("Content-Type", "application/octet-stream")
+ w.Header().Set("X-RDMA-Session-ID", resp.SessionID)
+ w.Header().Set("X-RDMA-Duration", duration.String())
+ w.Header().Set("X-RDMA-Transfer-Rate", fmt.Sprintf("%.2f", resp.TransferRate))
+ w.Header().Set("X-RDMA-Bytes-Read", fmt.Sprintf("%d", resp.BytesRead))
+
+ // Write the data
+ w.Write(resp.Data)
+}
diff --git a/seaweedfs-rdma-sidecar/cmd/test-rdma/main.go b/seaweedfs-rdma-sidecar/cmd/test-rdma/main.go
new file mode 100644
index 000000000..4f2b2da43
--- /dev/null
+++ b/seaweedfs-rdma-sidecar/cmd/test-rdma/main.go
@@ -0,0 +1,295 @@
+// Package main provides a test client for the RDMA engine integration
+package main
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "time"
+
+ "seaweedfs-rdma-sidecar/pkg/rdma"
+
+ "github.com/sirupsen/logrus"
+ "github.com/spf13/cobra"
+)
+
+var (
+ socketPath string
+ debug bool
+ timeout time.Duration
+ volumeID uint32
+ needleID uint64
+ cookie uint32
+ offset uint64
+ size uint64
+)
+
+func main() {
+ var rootCmd = &cobra.Command{
+ Use: "test-rdma",
+ Short: "Test client for SeaweedFS RDMA engine integration",
+ Long: `Test client that demonstrates communication between Go sidecar and Rust RDMA engine.
+
+This tool allows you to test various RDMA operations including:
+- Engine connectivity and capabilities
+- RDMA read operations with mock data
+- Performance measurements
+- IPC protocol validation`,
+ }
+
+ // Global flags
+ defaultSocketPath := os.Getenv("RDMA_SOCKET_PATH")
+ if defaultSocketPath == "" {
+ defaultSocketPath = "/tmp/rdma-engine.sock"
+ }
+ rootCmd.PersistentFlags().StringVarP(&socketPath, "socket", "s", defaultSocketPath, "Path to RDMA engine Unix socket (env: RDMA_SOCKET_PATH)")
+ rootCmd.PersistentFlags().BoolVarP(&debug, "debug", "d", false, "Enable debug logging")
+ rootCmd.PersistentFlags().DurationVarP(&timeout, "timeout", "t", 30*time.Second, "Operation timeout")
+
+ // Subcommands
+ rootCmd.AddCommand(pingCmd())
+ rootCmd.AddCommand(capsCmd())
+ rootCmd.AddCommand(readCmd())
+ rootCmd.AddCommand(benchCmd())
+
+ if err := rootCmd.Execute(); err != nil {
+ fmt.Fprintf(os.Stderr, "Error: %v\n", err)
+ os.Exit(1)
+ }
+}
+
+func pingCmd() *cobra.Command {
+ return &cobra.Command{
+ Use: "ping",
+ Short: "Test connectivity to RDMA engine",
+ Long: "Send a ping message to the RDMA engine and measure latency",
+ RunE: func(cmd *cobra.Command, args []string) error {
+ client := createClient()
+ defer client.Disconnect()
+
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+
+ fmt.Printf("๐Ÿ“ Pinging RDMA engine at %s...\n", socketPath)
+
+ if err := client.Connect(ctx); err != nil {
+ return fmt.Errorf("failed to connect: %w", err)
+ }
+
+ latency, err := client.Ping(ctx)
+ if err != nil {
+ return fmt.Errorf("ping failed: %w", err)
+ }
+
+ fmt.Printf("โœ… Ping successful! Latency: %v\n", latency)
+ return nil
+ },
+ }
+}
+
+func capsCmd() *cobra.Command {
+ return &cobra.Command{
+ Use: "capabilities",
+ Short: "Get RDMA engine capabilities",
+ Long: "Query the RDMA engine for its current capabilities and status",
+ RunE: func(cmd *cobra.Command, args []string) error {
+ client := createClient()
+ defer client.Disconnect()
+
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+
+ fmt.Printf("๐Ÿ” Querying RDMA engine capabilities...\n")
+
+ if err := client.Connect(ctx); err != nil {
+ return fmt.Errorf("failed to connect: %w", err)
+ }
+
+ caps := client.GetCapabilities()
+ if caps == nil {
+ return fmt.Errorf("no capabilities received")
+ }
+
+ fmt.Printf("\n๐Ÿ“Š RDMA Engine Capabilities:\n")
+ fmt.Printf(" Version: %s\n", caps.Version)
+ fmt.Printf(" Max Sessions: %d\n", caps.MaxSessions)
+ fmt.Printf(" Max Transfer Size: %d bytes (%.1f MB)\n", caps.MaxTransferSize, float64(caps.MaxTransferSize)/(1024*1024))
+ fmt.Printf(" Active Sessions: %d\n", caps.ActiveSessions)
+ fmt.Printf(" Real RDMA: %t\n", caps.RealRdma)
+ fmt.Printf(" Port GID: %s\n", caps.PortGid)
+ fmt.Printf(" Port LID: %d\n", caps.PortLid)
+ fmt.Printf(" Supported Auth: %v\n", caps.SupportedAuth)
+
+ if caps.RealRdma {
+ fmt.Printf("๐Ÿš€ Hardware RDMA enabled!\n")
+ } else {
+ fmt.Printf("๐ŸŸก Using mock RDMA (development mode)\n")
+ }
+
+ return nil
+ },
+ }
+}
+
+func readCmd() *cobra.Command {
+ cmd := &cobra.Command{
+ Use: "read",
+ Short: "Test RDMA read operation",
+ Long: "Perform a test RDMA read operation with specified parameters",
+ RunE: func(cmd *cobra.Command, args []string) error {
+ client := createClient()
+ defer client.Disconnect()
+
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+
+ fmt.Printf("๐Ÿ“– Testing RDMA read operation...\n")
+ fmt.Printf(" Volume ID: %d\n", volumeID)
+ fmt.Printf(" Needle ID: %d\n", needleID)
+ fmt.Printf(" Cookie: 0x%x\n", cookie)
+ fmt.Printf(" Offset: %d\n", offset)
+ fmt.Printf(" Size: %d bytes\n", size)
+
+ if err := client.Connect(ctx); err != nil {
+ return fmt.Errorf("failed to connect: %w", err)
+ }
+
+ start := time.Now()
+ resp, err := client.ReadRange(ctx, volumeID, needleID, cookie, offset, size)
+ if err != nil {
+ return fmt.Errorf("read failed: %w", err)
+ }
+
+ duration := time.Since(start)
+
+ fmt.Printf("\nโœ… RDMA read completed successfully!\n")
+ fmt.Printf(" Session ID: %s\n", resp.SessionID)
+ fmt.Printf(" Bytes Read: %d\n", resp.BytesRead)
+ fmt.Printf(" Duration: %v\n", duration)
+ fmt.Printf(" Transfer Rate: %.2f MB/s\n", resp.TransferRate)
+ fmt.Printf(" Success: %t\n", resp.Success)
+ fmt.Printf(" Message: %s\n", resp.Message)
+
+ // Show first few bytes of data for verification
+ if len(resp.Data) > 0 {
+ displayLen := 32
+ if len(resp.Data) < displayLen {
+ displayLen = len(resp.Data)
+ }
+ fmt.Printf(" Data (first %d bytes): %x\n", displayLen, resp.Data[:displayLen])
+ }
+
+ return nil
+ },
+ }
+
+ cmd.Flags().Uint32VarP(&volumeID, "volume", "v", 1, "Volume ID")
+ cmd.Flags().Uint64VarP(&needleID, "needle", "n", 100, "Needle ID")
+ cmd.Flags().Uint32VarP(&cookie, "cookie", "c", 0x12345678, "Needle cookie")
+ cmd.Flags().Uint64VarP(&offset, "offset", "o", 0, "Read offset")
+ cmd.Flags().Uint64VarP(&size, "size", "z", 4096, "Read size in bytes")
+
+ return cmd
+}
+
+func benchCmd() *cobra.Command {
+ var (
+ iterations int
+ readSize uint64
+ )
+
+ cmd := &cobra.Command{
+ Use: "bench",
+ Short: "Benchmark RDMA read performance",
+ Long: "Run multiple RDMA read operations and measure performance statistics",
+ RunE: func(cmd *cobra.Command, args []string) error {
+ client := createClient()
+ defer client.Disconnect()
+
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+
+ fmt.Printf("๐Ÿ Starting RDMA read benchmark...\n")
+ fmt.Printf(" Iterations: %d\n", iterations)
+ fmt.Printf(" Read Size: %d bytes\n", readSize)
+ fmt.Printf(" Socket: %s\n", socketPath)
+
+ if err := client.Connect(ctx); err != nil {
+ return fmt.Errorf("failed to connect: %w", err)
+ }
+
+ // Warmup
+ fmt.Printf("๐Ÿ”ฅ Warming up...\n")
+ for i := 0; i < 5; i++ {
+ _, err := client.ReadRange(ctx, 1, uint64(i+1), 0x12345678, 0, readSize)
+ if err != nil {
+ return fmt.Errorf("warmup read %d failed: %w", i+1, err)
+ }
+ }
+
+ // Benchmark
+ fmt.Printf("๐Ÿ“Š Running benchmark...\n")
+ var totalDuration time.Duration
+ var totalBytes uint64
+ successful := 0
+
+ startTime := time.Now()
+ for i := 0; i < iterations; i++ {
+ opStart := time.Now()
+ resp, err := client.ReadRange(ctx, 1, uint64(i+1), 0x12345678, 0, readSize)
+ opDuration := time.Since(opStart)
+
+ if err != nil {
+ fmt.Printf("โŒ Read %d failed: %v\n", i+1, err)
+ continue
+ }
+
+ totalDuration += opDuration
+ totalBytes += resp.BytesRead
+ successful++
+
+ if (i+1)%10 == 0 || i == iterations-1 {
+ fmt.Printf(" Completed %d/%d reads\n", i+1, iterations)
+ }
+ }
+ benchDuration := time.Since(startTime)
+
+ // Calculate statistics
+ avgLatency := totalDuration / time.Duration(successful)
+ throughputMBps := float64(totalBytes) / benchDuration.Seconds() / (1024 * 1024)
+ opsPerSec := float64(successful) / benchDuration.Seconds()
+
+ fmt.Printf("\n๐Ÿ“ˆ Benchmark Results:\n")
+ fmt.Printf(" Total Duration: %v\n", benchDuration)
+ fmt.Printf(" Successful Operations: %d/%d (%.1f%%)\n", successful, iterations, float64(successful)/float64(iterations)*100)
+ fmt.Printf(" Total Bytes Transferred: %d (%.1f MB)\n", totalBytes, float64(totalBytes)/(1024*1024))
+ fmt.Printf(" Average Latency: %v\n", avgLatency)
+ fmt.Printf(" Throughput: %.2f MB/s\n", throughputMBps)
+ fmt.Printf(" Operations/sec: %.1f\n", opsPerSec)
+
+ return nil
+ },
+ }
+
+ cmd.Flags().IntVarP(&iterations, "iterations", "i", 100, "Number of read operations")
+ cmd.Flags().Uint64VarP(&readSize, "read-size", "r", 4096, "Size of each read in bytes")
+
+ return cmd
+}
+
+func createClient() *rdma.Client {
+ logger := logrus.New()
+ if debug {
+ logger.SetLevel(logrus.DebugLevel)
+ } else {
+ logger.SetLevel(logrus.InfoLevel)
+ }
+
+ config := &rdma.Config{
+ EngineSocketPath: socketPath,
+ DefaultTimeout: timeout,
+ Logger: logger,
+ }
+
+ return rdma.NewClient(config)
+}