diff options
Diffstat (limited to 'seaweedfs-rdma-sidecar/cmd')
| -rw-r--r-- | seaweedfs-rdma-sidecar/cmd/demo-server/main.go | 663 | ||||
| -rw-r--r-- | seaweedfs-rdma-sidecar/cmd/sidecar/main.go | 345 | ||||
| -rw-r--r-- | seaweedfs-rdma-sidecar/cmd/test-rdma/main.go | 295 |
3 files changed, 1303 insertions, 0 deletions
diff --git a/seaweedfs-rdma-sidecar/cmd/demo-server/main.go b/seaweedfs-rdma-sidecar/cmd/demo-server/main.go new file mode 100644 index 000000000..42b5020e5 --- /dev/null +++ b/seaweedfs-rdma-sidecar/cmd/demo-server/main.go @@ -0,0 +1,663 @@ +// Package main provides a demonstration server showing SeaweedFS RDMA integration +package main + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "os" + "os/signal" + "strconv" + "strings" + "syscall" + "time" + + "seaweedfs-rdma-sidecar/pkg/seaweedfs" + + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +var ( + port int + rdmaSocket string + volumeServerURL string + enableRDMA bool + enableZeroCopy bool + tempDir string + enablePooling bool + maxConnections int + maxIdleTime time.Duration + debug bool +) + +func main() { + var rootCmd = &cobra.Command{ + Use: "demo-server", + Short: "SeaweedFS RDMA integration demonstration server", + Long: `Demonstration server that shows how SeaweedFS can integrate with the RDMA sidecar +for accelerated read operations. This server provides HTTP endpoints that demonstrate +the RDMA fast path with HTTP fallback capabilities.`, + RunE: runServer, + } + + rootCmd.Flags().IntVarP(&port, "port", "p", 8080, "Demo server HTTP port") + rootCmd.Flags().StringVarP(&rdmaSocket, "rdma-socket", "r", "/tmp/rdma-engine.sock", "Path to RDMA engine Unix socket") + rootCmd.Flags().StringVarP(&volumeServerURL, "volume-server", "v", "http://localhost:8080", "SeaweedFS volume server URL for HTTP fallback") + rootCmd.Flags().BoolVarP(&enableRDMA, "enable-rdma", "e", true, "Enable RDMA acceleration") + rootCmd.Flags().BoolVarP(&enableZeroCopy, "enable-zerocopy", "z", true, "Enable zero-copy optimization via temp files") + rootCmd.Flags().StringVarP(&tempDir, "temp-dir", "t", "/tmp/rdma-cache", "Temp directory for zero-copy files") + rootCmd.Flags().BoolVar(&enablePooling, "enable-pooling", true, "Enable RDMA connection pooling") + rootCmd.Flags().IntVar(&maxConnections, "max-connections", 10, "Maximum connections in RDMA pool") + rootCmd.Flags().DurationVar(&maxIdleTime, "max-idle-time", 5*time.Minute, "Maximum idle time for pooled connections") + rootCmd.Flags().BoolVarP(&debug, "debug", "d", false, "Enable debug logging") + + if err := rootCmd.Execute(); err != nil { + fmt.Fprintf(os.Stderr, "Error: %v\n", err) + os.Exit(1) + } +} + +func runServer(cmd *cobra.Command, args []string) error { + // Setup logging + logger := logrus.New() + if debug { + logger.SetLevel(logrus.DebugLevel) + logger.SetFormatter(&logrus.TextFormatter{ + FullTimestamp: true, + ForceColors: true, + }) + } else { + logger.SetLevel(logrus.InfoLevel) + } + + logger.WithFields(logrus.Fields{ + "port": port, + "rdma_socket": rdmaSocket, + "volume_server_url": volumeServerURL, + "enable_rdma": enableRDMA, + "enable_zerocopy": enableZeroCopy, + "temp_dir": tempDir, + "enable_pooling": enablePooling, + "max_connections": maxConnections, + "max_idle_time": maxIdleTime, + "debug": debug, + }).Info("๐ Starting SeaweedFS RDMA Demo Server") + + // Create SeaweedFS RDMA client + config := &seaweedfs.Config{ + RDMASocketPath: rdmaSocket, + VolumeServerURL: volumeServerURL, + Enabled: enableRDMA, + DefaultTimeout: 30 * time.Second, + Logger: logger, + TempDir: tempDir, + UseZeroCopy: enableZeroCopy, + EnablePooling: enablePooling, + MaxConnections: maxConnections, + MaxIdleTime: maxIdleTime, + } + + rdmaClient, err := seaweedfs.NewSeaweedFSRDMAClient(config) + if err != nil { + return fmt.Errorf("failed to create RDMA client: %w", err) + } + + // Start RDMA client + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + if err := rdmaClient.Start(ctx); err != nil { + logger.WithError(err).Error("Failed to start RDMA client") + } + cancel() + + // Create demo server + server := &DemoServer{ + rdmaClient: rdmaClient, + logger: logger, + } + + // Setup HTTP routes + mux := http.NewServeMux() + mux.HandleFunc("/", server.homeHandler) + mux.HandleFunc("/health", server.healthHandler) + mux.HandleFunc("/stats", server.statsHandler) + mux.HandleFunc("/read", server.readHandler) + mux.HandleFunc("/benchmark", server.benchmarkHandler) + mux.HandleFunc("/cleanup", server.cleanupHandler) + + httpServer := &http.Server{ + Addr: fmt.Sprintf(":%d", port), + Handler: mux, + } + + // Handle graceful shutdown + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + go func() { + logger.WithField("port", port).Info("๐ Demo server starting") + if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.WithError(err).Fatal("HTTP server failed") + } + }() + + // Wait for shutdown signal + <-sigChan + logger.Info("๐ก Received shutdown signal, gracefully shutting down...") + + // Shutdown HTTP server + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer shutdownCancel() + + if err := httpServer.Shutdown(shutdownCtx); err != nil { + logger.WithError(err).Error("HTTP server shutdown failed") + } else { + logger.Info("๐ HTTP server shutdown complete") + } + + // Stop RDMA client + rdmaClient.Stop() + logger.Info("๐ Demo server shutdown complete") + + return nil +} + +// DemoServer demonstrates SeaweedFS RDMA integration +type DemoServer struct { + rdmaClient *seaweedfs.SeaweedFSRDMAClient + logger *logrus.Logger +} + +// homeHandler provides information about the demo server +func (s *DemoServer) homeHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + w.Header().Set("Content-Type", "text/html") + fmt.Fprintf(w, `<!DOCTYPE html> +<html> +<head> + <title>SeaweedFS RDMA Demo Server</title> + <style> + body { font-family: Arial, sans-serif; margin: 40px; background-color: #f5f5f5; } + .container { max-width: 800px; margin: 0 auto; background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); } + h1 { color: #2c3e50; } + .endpoint { margin: 20px 0; padding: 15px; background: #ecf0f1; border-radius: 4px; } + .endpoint h3 { margin: 0 0 10px 0; color: #34495e; } + .endpoint a { color: #3498db; text-decoration: none; } + .endpoint a:hover { text-decoration: underline; } + .status { padding: 10px; border-radius: 4px; margin: 10px 0; } + .status.enabled { background: #d5f4e6; color: #27ae60; } + .status.disabled { background: #fadbd8; color: #e74c3c; } + </style> +</head> +<body> + <div class="container"> + <h1>๐ SeaweedFS RDMA Demo Server</h1> + <p>This server demonstrates SeaweedFS integration with RDMA acceleration for high-performance reads.</p> + + <div class="status %s"> + <strong>RDMA Status:</strong> %s + </div> + + <h2>๐ Available Endpoints</h2> + + <div class="endpoint"> + <h3>๐ฅ Health Check</h3> + <p><a href="/health">/health</a> - Check server and RDMA engine health</p> + </div> + + <div class="endpoint"> + <h3>๐ Statistics</h3> + <p><a href="/stats">/stats</a> - Get RDMA client statistics and capabilities</p> + </div> + + <div class="endpoint"> + <h3>๐ Read Needle</h3> + <p><a href="/read?file_id=3,01637037d6&size=1024&volume_server=http://localhost:8080">/read</a> - Read a needle with RDMA fast path</p> + <p><strong>Parameters:</strong> file_id OR (volume, needle, cookie), volume_server, offset (optional), size (optional)</p> + </div> + + <div class="endpoint"> + <h3>๐ Benchmark</h3> + <p><a href="/benchmark?iterations=10&size=4096">/benchmark</a> - Run performance benchmark</p> + <p><strong>Parameters:</strong> iterations (default: 10), size (default: 4096)</p> + </div> + + <h2>๐ Example Usage</h2> + <pre> +# Read a needle using file ID (recommended) +curl "http://localhost:%d/read?file_id=3,01637037d6&size=1024&volume_server=http://localhost:8080" + +# Read a needle using individual parameters (legacy) +curl "http://localhost:%d/read?volume=1&needle=12345&cookie=305419896&size=1024&volume_server=http://localhost:8080" + +# Read a needle (hex cookie) +curl "http://localhost:%d/read?volume=1&needle=12345&cookie=0x12345678&size=1024&volume_server=http://localhost:8080" + +# Run benchmark +curl "http://localhost:%d/benchmark?iterations=5&size=2048" + +# Check health +curl "http://localhost:%d/health" + </pre> + </div> +</body> +</html>`, + map[bool]string{true: "enabled", false: "disabled"}[s.rdmaClient.IsEnabled()], + map[bool]string{true: "RDMA Enabled โ
", false: "RDMA Disabled (HTTP Fallback Only) โ ๏ธ"}[s.rdmaClient.IsEnabled()], + port, port, port, port) +} + +// healthHandler checks server and RDMA health +func (s *DemoServer) healthHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + health := map[string]interface{}{ + "status": "healthy", + "timestamp": time.Now().Format(time.RFC3339), + "rdma": map[string]interface{}{ + "enabled": false, + "connected": false, + }, + } + + if s.rdmaClient != nil { + health["rdma"].(map[string]interface{})["enabled"] = s.rdmaClient.IsEnabled() + health["rdma"].(map[string]interface{})["type"] = "local" + + if s.rdmaClient.IsEnabled() { + if err := s.rdmaClient.HealthCheck(ctx); err != nil { + s.logger.WithError(err).Warn("RDMA health check failed") + health["rdma"].(map[string]interface{})["error"] = err.Error() + } else { + health["rdma"].(map[string]interface{})["connected"] = true + } + } + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(health) +} + +// statsHandler returns RDMA statistics +func (s *DemoServer) statsHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + var stats map[string]interface{} + + if s.rdmaClient != nil { + stats = s.rdmaClient.GetStats() + stats["client_type"] = "local" + } else { + stats = map[string]interface{}{ + "client_type": "none", + "error": "no RDMA client available", + } + } + + stats["timestamp"] = time.Now().Format(time.RFC3339) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(stats) +} + +// readHandler demonstrates needle reading with RDMA +func (s *DemoServer) readHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // Parse parameters - support both file_id and individual parameters for backward compatibility + query := r.URL.Query() + volumeServer := query.Get("volume_server") + fileID := query.Get("file_id") + + var volumeID, cookie uint64 + var needleID uint64 + var err error + + if fileID != "" { + // Use file ID format (e.g., "3,01637037d6") + // Extract individual components using existing SeaweedFS parsing + fid, parseErr := needle.ParseFileIdFromString(fileID) + if parseErr != nil { + http.Error(w, fmt.Sprintf("invalid 'file_id' parameter: %v", parseErr), http.StatusBadRequest) + return + } + volumeID = uint64(fid.VolumeId) + needleID = uint64(fid.Key) + cookie = uint64(fid.Cookie) + } else { + // Use individual parameters (backward compatibility) + volumeID, err = strconv.ParseUint(query.Get("volume"), 10, 32) + if err != nil { + http.Error(w, "invalid 'volume' parameter", http.StatusBadRequest) + return + } + + needleID, err = strconv.ParseUint(query.Get("needle"), 10, 64) + if err != nil { + http.Error(w, "invalid 'needle' parameter", http.StatusBadRequest) + return + } + + // Parse cookie parameter - support both decimal and hexadecimal formats + cookieStr := query.Get("cookie") + if strings.HasPrefix(strings.ToLower(cookieStr), "0x") { + // Parse as hexadecimal (remove "0x" prefix) + cookie, err = strconv.ParseUint(cookieStr[2:], 16, 32) + } else { + // Parse as decimal (default) + cookie, err = strconv.ParseUint(cookieStr, 10, 32) + } + if err != nil { + http.Error(w, "invalid 'cookie' parameter (expected decimal or hex with 0x prefix)", http.StatusBadRequest) + return + } + } + + var offset uint64 + if offsetStr := query.Get("offset"); offsetStr != "" { + var parseErr error + offset, parseErr = strconv.ParseUint(offsetStr, 10, 64) + if parseErr != nil { + http.Error(w, "invalid 'offset' parameter", http.StatusBadRequest) + return + } + } + + var size uint64 + if sizeStr := query.Get("size"); sizeStr != "" { + var parseErr error + size, parseErr = strconv.ParseUint(sizeStr, 10, 64) + if parseErr != nil { + http.Error(w, "invalid 'size' parameter", http.StatusBadRequest) + return + } + } + + if volumeServer == "" { + http.Error(w, "volume_server parameter is required", http.StatusBadRequest) + return + } + + if volumeID == 0 || needleID == 0 { + http.Error(w, "volume and needle parameters are required", http.StatusBadRequest) + return + } + + // Note: cookie and size can have defaults for demo purposes when user provides empty values, + // but invalid parsing is caught above with proper error responses + if cookie == 0 { + cookie = 0x12345678 // Default cookie for demo + } + + if size == 0 { + size = 4096 // Default size + } + + logFields := logrus.Fields{ + "volume_server": volumeServer, + "volume_id": volumeID, + "needle_id": needleID, + "cookie": fmt.Sprintf("0x%x", cookie), + "offset": offset, + "size": size, + } + if fileID != "" { + logFields["file_id"] = fileID + } + s.logger.WithFields(logFields).Info("๐ Processing needle read request") + + ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second) + defer cancel() + + start := time.Now() + req := &seaweedfs.NeedleReadRequest{ + VolumeID: uint32(volumeID), + NeedleID: needleID, + Cookie: uint32(cookie), + Offset: offset, + Size: size, + VolumeServer: volumeServer, + } + + resp, err := s.rdmaClient.ReadNeedle(ctx, req) + + if err != nil { + s.logger.WithError(err).Error("โ Needle read failed") + http.Error(w, fmt.Sprintf("Read failed: %v", err), http.StatusInternalServerError) + return + } + + duration := time.Since(start) + + s.logger.WithFields(logrus.Fields{ + "volume_id": volumeID, + "needle_id": needleID, + "is_rdma": resp.IsRDMA, + "source": resp.Source, + "duration": duration, + "data_size": len(resp.Data), + }).Info("โ
Needle read completed") + + // Return metadata and first few bytes + result := map[string]interface{}{ + "success": true, + "volume_id": volumeID, + "needle_id": needleID, + "cookie": fmt.Sprintf("0x%x", cookie), + "is_rdma": resp.IsRDMA, + "source": resp.Source, + "session_id": resp.SessionID, + "duration": duration.String(), + "data_size": len(resp.Data), + "timestamp": time.Now().Format(time.RFC3339), + "use_temp_file": resp.UseTempFile, + "temp_file": resp.TempFilePath, + } + + // Set headers for zero-copy optimization + if resp.UseTempFile && resp.TempFilePath != "" { + w.Header().Set("X-Use-Temp-File", "true") + w.Header().Set("X-Temp-File", resp.TempFilePath) + w.Header().Set("X-Source", resp.Source) + w.Header().Set("X-RDMA-Used", fmt.Sprintf("%t", resp.IsRDMA)) + + // For zero-copy, return minimal JSON response and let client read from temp file + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(result) + return + } + + // Regular response with data + w.Header().Set("X-Source", resp.Source) + w.Header().Set("X-RDMA-Used", fmt.Sprintf("%t", resp.IsRDMA)) + + // Include first 32 bytes as hex for verification + if len(resp.Data) > 0 { + displayLen := 32 + if len(resp.Data) < displayLen { + displayLen = len(resp.Data) + } + result["data_preview"] = fmt.Sprintf("%x", resp.Data[:displayLen]) + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(result) +} + +// benchmarkHandler runs performance benchmarks +func (s *DemoServer) benchmarkHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // Parse parameters + query := r.URL.Query() + + iterations := 10 // default value + if iterationsStr := query.Get("iterations"); iterationsStr != "" { + var parseErr error + iterations, parseErr = strconv.Atoi(iterationsStr) + if parseErr != nil { + http.Error(w, "invalid 'iterations' parameter", http.StatusBadRequest) + return + } + } + + size := uint64(4096) // default value + if sizeStr := query.Get("size"); sizeStr != "" { + var parseErr error + size, parseErr = strconv.ParseUint(sizeStr, 10, 64) + if parseErr != nil { + http.Error(w, "invalid 'size' parameter", http.StatusBadRequest) + return + } + } + + if iterations <= 0 { + iterations = 10 + } + if size == 0 { + size = 4096 + } + + s.logger.WithFields(logrus.Fields{ + "iterations": iterations, + "size": size, + }).Info("๐ Starting benchmark") + + ctx, cancel := context.WithTimeout(r.Context(), 60*time.Second) + defer cancel() + + var rdmaSuccessful, rdmaFailed, httpSuccessful, httpFailed int + var totalDuration time.Duration + var totalBytes uint64 + + startTime := time.Now() + + for i := 0; i < iterations; i++ { + req := &seaweedfs.NeedleReadRequest{ + VolumeID: 1, + NeedleID: uint64(i + 1), + Cookie: 0x12345678, + Offset: 0, + Size: size, + } + + opStart := time.Now() + resp, err := s.rdmaClient.ReadNeedle(ctx, req) + opDuration := time.Since(opStart) + + if err != nil { + httpFailed++ + continue + } + + totalDuration += opDuration + totalBytes += uint64(len(resp.Data)) + + if resp.IsRDMA { + rdmaSuccessful++ + } else { + httpSuccessful++ + } + } + + benchDuration := time.Since(startTime) + + // Calculate statistics + totalOperations := rdmaSuccessful + httpSuccessful + avgLatency := time.Duration(0) + if totalOperations > 0 { + avgLatency = totalDuration / time.Duration(totalOperations) + } + + throughputMBps := float64(totalBytes) / benchDuration.Seconds() / (1024 * 1024) + opsPerSec := float64(totalOperations) / benchDuration.Seconds() + + result := map[string]interface{}{ + "benchmark_results": map[string]interface{}{ + "iterations": iterations, + "size_per_op": size, + "total_duration": benchDuration.String(), + "successful_ops": totalOperations, + "failed_ops": rdmaFailed + httpFailed, + "rdma_ops": rdmaSuccessful, + "http_ops": httpSuccessful, + "avg_latency": avgLatency.String(), + "throughput_mbps": fmt.Sprintf("%.2f", throughputMBps), + "ops_per_sec": fmt.Sprintf("%.1f", opsPerSec), + "total_bytes": totalBytes, + }, + "rdma_enabled": s.rdmaClient.IsEnabled(), + "timestamp": time.Now().Format(time.RFC3339), + } + + s.logger.WithFields(logrus.Fields{ + "iterations": iterations, + "successful_ops": totalOperations, + "rdma_ops": rdmaSuccessful, + "http_ops": httpSuccessful, + "avg_latency": avgLatency, + "throughput_mbps": throughputMBps, + "ops_per_sec": opsPerSec, + }).Info("๐ Benchmark completed") + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(result) +} + +// cleanupHandler handles temp file cleanup requests from mount clients +func (s *DemoServer) cleanupHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodDelete { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // Get temp file path from query parameters + tempFilePath := r.URL.Query().Get("temp_file") + if tempFilePath == "" { + http.Error(w, "missing 'temp_file' parameter", http.StatusBadRequest) + return + } + + s.logger.WithField("temp_file", tempFilePath).Debug("๐๏ธ Processing cleanup request") + + // Use the RDMA client's cleanup method (which delegates to seaweedfs client) + err := s.rdmaClient.CleanupTempFile(tempFilePath) + if err != nil { + s.logger.WithError(err).WithField("temp_file", tempFilePath).Warn("Failed to cleanup temp file") + http.Error(w, fmt.Sprintf("cleanup failed: %v", err), http.StatusInternalServerError) + return + } + + s.logger.WithField("temp_file", tempFilePath).Debug("๐งน Temp file cleanup successful") + + // Return success response + w.Header().Set("Content-Type", "application/json") + response := map[string]interface{}{ + "success": true, + "message": "temp file cleaned up successfully", + "temp_file": tempFilePath, + "timestamp": time.Now().Format(time.RFC3339), + } + json.NewEncoder(w).Encode(response) +} diff --git a/seaweedfs-rdma-sidecar/cmd/sidecar/main.go b/seaweedfs-rdma-sidecar/cmd/sidecar/main.go new file mode 100644 index 000000000..55d98c4c6 --- /dev/null +++ b/seaweedfs-rdma-sidecar/cmd/sidecar/main.go @@ -0,0 +1,345 @@ +// Package main provides the main RDMA sidecar service that integrates with SeaweedFS +package main + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "os" + "os/signal" + "strconv" + "syscall" + "time" + + "seaweedfs-rdma-sidecar/pkg/rdma" + + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +var ( + port int + engineSocket string + debug bool + timeout time.Duration +) + +// Response structs for JSON encoding +type HealthResponse struct { + Status string `json:"status"` + RdmaEngineConnected bool `json:"rdma_engine_connected"` + RdmaEngineLatency string `json:"rdma_engine_latency"` + Timestamp string `json:"timestamp"` +} + +type CapabilitiesResponse struct { + Version string `json:"version"` + DeviceName string `json:"device_name"` + VendorId uint32 `json:"vendor_id"` + MaxSessions uint32 `json:"max_sessions"` + MaxTransferSize uint64 `json:"max_transfer_size"` + ActiveSessions uint32 `json:"active_sessions"` + RealRdma bool `json:"real_rdma"` + PortGid string `json:"port_gid"` + PortLid uint16 `json:"port_lid"` + SupportedAuth []string `json:"supported_auth"` +} + +type PingResponse struct { + Success bool `json:"success"` + EngineLatency string `json:"engine_latency"` + TotalLatency string `json:"total_latency"` + Timestamp string `json:"timestamp"` +} + +func main() { + var rootCmd = &cobra.Command{ + Use: "rdma-sidecar", + Short: "SeaweedFS RDMA acceleration sidecar", + Long: `RDMA sidecar that accelerates SeaweedFS read/write operations using UCX and Rust RDMA engine. + +This sidecar acts as a bridge between SeaweedFS volume servers and the high-performance +Rust RDMA engine, providing significant performance improvements for data-intensive workloads.`, + RunE: runSidecar, + } + + // Flags + rootCmd.Flags().IntVarP(&port, "port", "p", 8081, "HTTP server port") + rootCmd.Flags().StringVarP(&engineSocket, "engine-socket", "e", "/tmp/rdma-engine.sock", "Path to RDMA engine Unix socket") + rootCmd.Flags().BoolVarP(&debug, "debug", "d", false, "Enable debug logging") + rootCmd.Flags().DurationVarP(&timeout, "timeout", "t", 30*time.Second, "RDMA operation timeout") + + if err := rootCmd.Execute(); err != nil { + fmt.Fprintf(os.Stderr, "Error: %v\n", err) + os.Exit(1) + } +} + +func runSidecar(cmd *cobra.Command, args []string) error { + // Setup logging + logger := logrus.New() + if debug { + logger.SetLevel(logrus.DebugLevel) + logger.SetFormatter(&logrus.TextFormatter{ + FullTimestamp: true, + ForceColors: true, + }) + } else { + logger.SetLevel(logrus.InfoLevel) + } + + logger.WithFields(logrus.Fields{ + "port": port, + "engine_socket": engineSocket, + "debug": debug, + "timeout": timeout, + }).Info("๐ Starting SeaweedFS RDMA Sidecar") + + // Create RDMA client + rdmaConfig := &rdma.Config{ + EngineSocketPath: engineSocket, + DefaultTimeout: timeout, + Logger: logger, + } + + rdmaClient := rdma.NewClient(rdmaConfig) + + // Connect to RDMA engine + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + logger.Info("๐ Connecting to RDMA engine...") + if err := rdmaClient.Connect(ctx); err != nil { + return fmt.Errorf("failed to connect to RDMA engine: %w", err) + } + logger.Info("โ
Connected to RDMA engine successfully") + + // Create HTTP server + sidecar := &Sidecar{ + rdmaClient: rdmaClient, + logger: logger, + } + + mux := http.NewServeMux() + + // Health check endpoint + mux.HandleFunc("/health", sidecar.healthHandler) + + // RDMA operations endpoints + mux.HandleFunc("/rdma/read", sidecar.rdmaReadHandler) + mux.HandleFunc("/rdma/capabilities", sidecar.capabilitiesHandler) + mux.HandleFunc("/rdma/ping", sidecar.pingHandler) + + server := &http.Server{ + Addr: fmt.Sprintf(":%d", port), + Handler: mux, + } + + // Handle graceful shutdown + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + go func() { + logger.WithField("port", port).Info("๐ HTTP server starting") + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.WithError(err).Fatal("HTTP server failed") + } + }() + + // Wait for shutdown signal + <-sigChan + logger.Info("๐ก Received shutdown signal, gracefully shutting down...") + + // Shutdown HTTP server + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer shutdownCancel() + + if err := server.Shutdown(shutdownCtx); err != nil { + logger.WithError(err).Error("HTTP server shutdown failed") + } else { + logger.Info("๐ HTTP server shutdown complete") + } + + // Disconnect from RDMA engine + rdmaClient.Disconnect() + logger.Info("๐ RDMA sidecar shutdown complete") + + return nil +} + +// Sidecar represents the main sidecar service +type Sidecar struct { + rdmaClient *rdma.Client + logger *logrus.Logger +} + +// Health check handler +func (s *Sidecar) healthHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + // Test RDMA engine connectivity + if !s.rdmaClient.IsConnected() { + s.logger.Warn("โ ๏ธ RDMA engine not connected") + http.Error(w, "RDMA engine not connected", http.StatusServiceUnavailable) + return + } + + // Ping RDMA engine + latency, err := s.rdmaClient.Ping(ctx) + if err != nil { + s.logger.WithError(err).Error("โ RDMA engine ping failed") + http.Error(w, "RDMA engine ping failed", http.StatusServiceUnavailable) + return + } + + w.Header().Set("Content-Type", "application/json") + response := HealthResponse{ + Status: "healthy", + RdmaEngineConnected: true, + RdmaEngineLatency: latency.String(), + Timestamp: time.Now().Format(time.RFC3339), + } + json.NewEncoder(w).Encode(response) +} + +// RDMA capabilities handler +func (s *Sidecar) capabilitiesHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + caps := s.rdmaClient.GetCapabilities() + if caps == nil { + http.Error(w, "No capabilities available", http.StatusServiceUnavailable) + return + } + + w.Header().Set("Content-Type", "application/json") + response := CapabilitiesResponse{ + Version: caps.Version, + DeviceName: caps.DeviceName, + VendorId: caps.VendorId, + MaxSessions: uint32(caps.MaxSessions), + MaxTransferSize: caps.MaxTransferSize, + ActiveSessions: uint32(caps.ActiveSessions), + RealRdma: caps.RealRdma, + PortGid: caps.PortGid, + PortLid: caps.PortLid, + SupportedAuth: caps.SupportedAuth, + } + json.NewEncoder(w).Encode(response) +} + +// RDMA ping handler +func (s *Sidecar) pingHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) + defer cancel() + + start := time.Now() + latency, err := s.rdmaClient.Ping(ctx) + totalLatency := time.Since(start) + + if err != nil { + s.logger.WithError(err).Error("โ RDMA ping failed") + http.Error(w, fmt.Sprintf("Ping failed: %v", err), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + response := PingResponse{ + Success: true, + EngineLatency: latency.String(), + TotalLatency: totalLatency.String(), + Timestamp: time.Now().Format(time.RFC3339), + } + json.NewEncoder(w).Encode(response) +} + +// RDMA read handler - uses GET method with query parameters for RESTful read operations +func (s *Sidecar) rdmaReadHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // Parse query parameters + query := r.URL.Query() + + // Get file ID (e.g., "3,01637037d6") - this is the natural SeaweedFS identifier + fileID := query.Get("file_id") + if fileID == "" { + http.Error(w, "missing 'file_id' parameter", http.StatusBadRequest) + return + } + + // Parse optional offset and size parameters + offset := uint64(0) // default value + if offsetStr := query.Get("offset"); offsetStr != "" { + val, err := strconv.ParseUint(offsetStr, 10, 64) + if err != nil { + http.Error(w, "invalid 'offset' parameter", http.StatusBadRequest) + return + } + offset = val + } + + size := uint64(4096) // default value + if sizeStr := query.Get("size"); sizeStr != "" { + val, err := strconv.ParseUint(sizeStr, 10, 64) + if err != nil { + http.Error(w, "invalid 'size' parameter", http.StatusBadRequest) + return + } + size = val + } + + s.logger.WithFields(logrus.Fields{ + "file_id": fileID, + "offset": offset, + "size": size, + }).Info("๐ Processing RDMA read request") + + ctx, cancel := context.WithTimeout(r.Context(), timeout) + defer cancel() + + start := time.Now() + resp, err := s.rdmaClient.ReadFileRange(ctx, fileID, offset, size) + duration := time.Since(start) + + if err != nil { + s.logger.WithError(err).Error("โ RDMA read failed") + http.Error(w, fmt.Sprintf("RDMA read failed: %v", err), http.StatusInternalServerError) + return + } + + s.logger.WithFields(logrus.Fields{ + "file_id": fileID, + "bytes_read": resp.BytesRead, + "duration": duration, + "transfer_rate": resp.TransferRate, + "session_id": resp.SessionID, + }).Info("โ
RDMA read completed successfully") + + // Set response headers + w.Header().Set("Content-Type", "application/octet-stream") + w.Header().Set("X-RDMA-Session-ID", resp.SessionID) + w.Header().Set("X-RDMA-Duration", duration.String()) + w.Header().Set("X-RDMA-Transfer-Rate", fmt.Sprintf("%.2f", resp.TransferRate)) + w.Header().Set("X-RDMA-Bytes-Read", fmt.Sprintf("%d", resp.BytesRead)) + + // Write the data + w.Write(resp.Data) +} diff --git a/seaweedfs-rdma-sidecar/cmd/test-rdma/main.go b/seaweedfs-rdma-sidecar/cmd/test-rdma/main.go new file mode 100644 index 000000000..4f2b2da43 --- /dev/null +++ b/seaweedfs-rdma-sidecar/cmd/test-rdma/main.go @@ -0,0 +1,295 @@ +// Package main provides a test client for the RDMA engine integration +package main + +import ( + "context" + "fmt" + "os" + "time" + + "seaweedfs-rdma-sidecar/pkg/rdma" + + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +var ( + socketPath string + debug bool + timeout time.Duration + volumeID uint32 + needleID uint64 + cookie uint32 + offset uint64 + size uint64 +) + +func main() { + var rootCmd = &cobra.Command{ + Use: "test-rdma", + Short: "Test client for SeaweedFS RDMA engine integration", + Long: `Test client that demonstrates communication between Go sidecar and Rust RDMA engine. + +This tool allows you to test various RDMA operations including: +- Engine connectivity and capabilities +- RDMA read operations with mock data +- Performance measurements +- IPC protocol validation`, + } + + // Global flags + defaultSocketPath := os.Getenv("RDMA_SOCKET_PATH") + if defaultSocketPath == "" { + defaultSocketPath = "/tmp/rdma-engine.sock" + } + rootCmd.PersistentFlags().StringVarP(&socketPath, "socket", "s", defaultSocketPath, "Path to RDMA engine Unix socket (env: RDMA_SOCKET_PATH)") + rootCmd.PersistentFlags().BoolVarP(&debug, "debug", "d", false, "Enable debug logging") + rootCmd.PersistentFlags().DurationVarP(&timeout, "timeout", "t", 30*time.Second, "Operation timeout") + + // Subcommands + rootCmd.AddCommand(pingCmd()) + rootCmd.AddCommand(capsCmd()) + rootCmd.AddCommand(readCmd()) + rootCmd.AddCommand(benchCmd()) + + if err := rootCmd.Execute(); err != nil { + fmt.Fprintf(os.Stderr, "Error: %v\n", err) + os.Exit(1) + } +} + +func pingCmd() *cobra.Command { + return &cobra.Command{ + Use: "ping", + Short: "Test connectivity to RDMA engine", + Long: "Send a ping message to the RDMA engine and measure latency", + RunE: func(cmd *cobra.Command, args []string) error { + client := createClient() + defer client.Disconnect() + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + fmt.Printf("๐ Pinging RDMA engine at %s...\n", socketPath) + + if err := client.Connect(ctx); err != nil { + return fmt.Errorf("failed to connect: %w", err) + } + + latency, err := client.Ping(ctx) + if err != nil { + return fmt.Errorf("ping failed: %w", err) + } + + fmt.Printf("โ
Ping successful! Latency: %v\n", latency) + return nil + }, + } +} + +func capsCmd() *cobra.Command { + return &cobra.Command{ + Use: "capabilities", + Short: "Get RDMA engine capabilities", + Long: "Query the RDMA engine for its current capabilities and status", + RunE: func(cmd *cobra.Command, args []string) error { + client := createClient() + defer client.Disconnect() + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + fmt.Printf("๐ Querying RDMA engine capabilities...\n") + + if err := client.Connect(ctx); err != nil { + return fmt.Errorf("failed to connect: %w", err) + } + + caps := client.GetCapabilities() + if caps == nil { + return fmt.Errorf("no capabilities received") + } + + fmt.Printf("\n๐ RDMA Engine Capabilities:\n") + fmt.Printf(" Version: %s\n", caps.Version) + fmt.Printf(" Max Sessions: %d\n", caps.MaxSessions) + fmt.Printf(" Max Transfer Size: %d bytes (%.1f MB)\n", caps.MaxTransferSize, float64(caps.MaxTransferSize)/(1024*1024)) + fmt.Printf(" Active Sessions: %d\n", caps.ActiveSessions) + fmt.Printf(" Real RDMA: %t\n", caps.RealRdma) + fmt.Printf(" Port GID: %s\n", caps.PortGid) + fmt.Printf(" Port LID: %d\n", caps.PortLid) + fmt.Printf(" Supported Auth: %v\n", caps.SupportedAuth) + + if caps.RealRdma { + fmt.Printf("๐ Hardware RDMA enabled!\n") + } else { + fmt.Printf("๐ก Using mock RDMA (development mode)\n") + } + + return nil + }, + } +} + +func readCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "read", + Short: "Test RDMA read operation", + Long: "Perform a test RDMA read operation with specified parameters", + RunE: func(cmd *cobra.Command, args []string) error { + client := createClient() + defer client.Disconnect() + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + fmt.Printf("๐ Testing RDMA read operation...\n") + fmt.Printf(" Volume ID: %d\n", volumeID) + fmt.Printf(" Needle ID: %d\n", needleID) + fmt.Printf(" Cookie: 0x%x\n", cookie) + fmt.Printf(" Offset: %d\n", offset) + fmt.Printf(" Size: %d bytes\n", size) + + if err := client.Connect(ctx); err != nil { + return fmt.Errorf("failed to connect: %w", err) + } + + start := time.Now() + resp, err := client.ReadRange(ctx, volumeID, needleID, cookie, offset, size) + if err != nil { + return fmt.Errorf("read failed: %w", err) + } + + duration := time.Since(start) + + fmt.Printf("\nโ
RDMA read completed successfully!\n") + fmt.Printf(" Session ID: %s\n", resp.SessionID) + fmt.Printf(" Bytes Read: %d\n", resp.BytesRead) + fmt.Printf(" Duration: %v\n", duration) + fmt.Printf(" Transfer Rate: %.2f MB/s\n", resp.TransferRate) + fmt.Printf(" Success: %t\n", resp.Success) + fmt.Printf(" Message: %s\n", resp.Message) + + // Show first few bytes of data for verification + if len(resp.Data) > 0 { + displayLen := 32 + if len(resp.Data) < displayLen { + displayLen = len(resp.Data) + } + fmt.Printf(" Data (first %d bytes): %x\n", displayLen, resp.Data[:displayLen]) + } + + return nil + }, + } + + cmd.Flags().Uint32VarP(&volumeID, "volume", "v", 1, "Volume ID") + cmd.Flags().Uint64VarP(&needleID, "needle", "n", 100, "Needle ID") + cmd.Flags().Uint32VarP(&cookie, "cookie", "c", 0x12345678, "Needle cookie") + cmd.Flags().Uint64VarP(&offset, "offset", "o", 0, "Read offset") + cmd.Flags().Uint64VarP(&size, "size", "z", 4096, "Read size in bytes") + + return cmd +} + +func benchCmd() *cobra.Command { + var ( + iterations int + readSize uint64 + ) + + cmd := &cobra.Command{ + Use: "bench", + Short: "Benchmark RDMA read performance", + Long: "Run multiple RDMA read operations and measure performance statistics", + RunE: func(cmd *cobra.Command, args []string) error { + client := createClient() + defer client.Disconnect() + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + fmt.Printf("๐ Starting RDMA read benchmark...\n") + fmt.Printf(" Iterations: %d\n", iterations) + fmt.Printf(" Read Size: %d bytes\n", readSize) + fmt.Printf(" Socket: %s\n", socketPath) + + if err := client.Connect(ctx); err != nil { + return fmt.Errorf("failed to connect: %w", err) + } + + // Warmup + fmt.Printf("๐ฅ Warming up...\n") + for i := 0; i < 5; i++ { + _, err := client.ReadRange(ctx, 1, uint64(i+1), 0x12345678, 0, readSize) + if err != nil { + return fmt.Errorf("warmup read %d failed: %w", i+1, err) + } + } + + // Benchmark + fmt.Printf("๐ Running benchmark...\n") + var totalDuration time.Duration + var totalBytes uint64 + successful := 0 + + startTime := time.Now() + for i := 0; i < iterations; i++ { + opStart := time.Now() + resp, err := client.ReadRange(ctx, 1, uint64(i+1), 0x12345678, 0, readSize) + opDuration := time.Since(opStart) + + if err != nil { + fmt.Printf("โ Read %d failed: %v\n", i+1, err) + continue + } + + totalDuration += opDuration + totalBytes += resp.BytesRead + successful++ + + if (i+1)%10 == 0 || i == iterations-1 { + fmt.Printf(" Completed %d/%d reads\n", i+1, iterations) + } + } + benchDuration := time.Since(startTime) + + // Calculate statistics + avgLatency := totalDuration / time.Duration(successful) + throughputMBps := float64(totalBytes) / benchDuration.Seconds() / (1024 * 1024) + opsPerSec := float64(successful) / benchDuration.Seconds() + + fmt.Printf("\n๐ Benchmark Results:\n") + fmt.Printf(" Total Duration: %v\n", benchDuration) + fmt.Printf(" Successful Operations: %d/%d (%.1f%%)\n", successful, iterations, float64(successful)/float64(iterations)*100) + fmt.Printf(" Total Bytes Transferred: %d (%.1f MB)\n", totalBytes, float64(totalBytes)/(1024*1024)) + fmt.Printf(" Average Latency: %v\n", avgLatency) + fmt.Printf(" Throughput: %.2f MB/s\n", throughputMBps) + fmt.Printf(" Operations/sec: %.1f\n", opsPerSec) + + return nil + }, + } + + cmd.Flags().IntVarP(&iterations, "iterations", "i", 100, "Number of read operations") + cmd.Flags().Uint64VarP(&readSize, "read-size", "r", 4096, "Size of each read in bytes") + + return cmd +} + +func createClient() *rdma.Client { + logger := logrus.New() + if debug { + logger.SetLevel(logrus.DebugLevel) + } else { + logger.SetLevel(logrus.InfoLevel) + } + + config := &rdma.Config{ + EngineSocketPath: socketPath, + DefaultTimeout: timeout, + Logger: logger, + } + + return rdma.NewClient(config) +} |
