aboutsummaryrefslogtreecommitdiff
path: root/seaweedfs-rdma-sidecar/cmd/sidecar/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'seaweedfs-rdma-sidecar/cmd/sidecar/main.go')
-rw-r--r--seaweedfs-rdma-sidecar/cmd/sidecar/main.go345
1 files changed, 345 insertions, 0 deletions
diff --git a/seaweedfs-rdma-sidecar/cmd/sidecar/main.go b/seaweedfs-rdma-sidecar/cmd/sidecar/main.go
new file mode 100644
index 000000000..55d98c4c6
--- /dev/null
+++ b/seaweedfs-rdma-sidecar/cmd/sidecar/main.go
@@ -0,0 +1,345 @@
+// Package main provides the main RDMA sidecar service that integrates with SeaweedFS
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "os"
+ "os/signal"
+ "strconv"
+ "syscall"
+ "time"
+
+ "seaweedfs-rdma-sidecar/pkg/rdma"
+
+ "github.com/sirupsen/logrus"
+ "github.com/spf13/cobra"
+)
+
+var (
+ port int
+ engineSocket string
+ debug bool
+ timeout time.Duration
+)
+
+// Response structs for JSON encoding
+type HealthResponse struct {
+ Status string `json:"status"`
+ RdmaEngineConnected bool `json:"rdma_engine_connected"`
+ RdmaEngineLatency string `json:"rdma_engine_latency"`
+ Timestamp string `json:"timestamp"`
+}
+
+type CapabilitiesResponse struct {
+ Version string `json:"version"`
+ DeviceName string `json:"device_name"`
+ VendorId uint32 `json:"vendor_id"`
+ MaxSessions uint32 `json:"max_sessions"`
+ MaxTransferSize uint64 `json:"max_transfer_size"`
+ ActiveSessions uint32 `json:"active_sessions"`
+ RealRdma bool `json:"real_rdma"`
+ PortGid string `json:"port_gid"`
+ PortLid uint16 `json:"port_lid"`
+ SupportedAuth []string `json:"supported_auth"`
+}
+
+type PingResponse struct {
+ Success bool `json:"success"`
+ EngineLatency string `json:"engine_latency"`
+ TotalLatency string `json:"total_latency"`
+ Timestamp string `json:"timestamp"`
+}
+
+func main() {
+ var rootCmd = &cobra.Command{
+ Use: "rdma-sidecar",
+ Short: "SeaweedFS RDMA acceleration sidecar",
+ Long: `RDMA sidecar that accelerates SeaweedFS read/write operations using UCX and Rust RDMA engine.
+
+This sidecar acts as a bridge between SeaweedFS volume servers and the high-performance
+Rust RDMA engine, providing significant performance improvements for data-intensive workloads.`,
+ RunE: runSidecar,
+ }
+
+ // Flags
+ rootCmd.Flags().IntVarP(&port, "port", "p", 8081, "HTTP server port")
+ rootCmd.Flags().StringVarP(&engineSocket, "engine-socket", "e", "/tmp/rdma-engine.sock", "Path to RDMA engine Unix socket")
+ rootCmd.Flags().BoolVarP(&debug, "debug", "d", false, "Enable debug logging")
+ rootCmd.Flags().DurationVarP(&timeout, "timeout", "t", 30*time.Second, "RDMA operation timeout")
+
+ if err := rootCmd.Execute(); err != nil {
+ fmt.Fprintf(os.Stderr, "Error: %v\n", err)
+ os.Exit(1)
+ }
+}
+
+func runSidecar(cmd *cobra.Command, args []string) error {
+ // Setup logging
+ logger := logrus.New()
+ if debug {
+ logger.SetLevel(logrus.DebugLevel)
+ logger.SetFormatter(&logrus.TextFormatter{
+ FullTimestamp: true,
+ ForceColors: true,
+ })
+ } else {
+ logger.SetLevel(logrus.InfoLevel)
+ }
+
+ logger.WithFields(logrus.Fields{
+ "port": port,
+ "engine_socket": engineSocket,
+ "debug": debug,
+ "timeout": timeout,
+ }).Info("🚀 Starting SeaweedFS RDMA Sidecar")
+
+ // Create RDMA client
+ rdmaConfig := &rdma.Config{
+ EngineSocketPath: engineSocket,
+ DefaultTimeout: timeout,
+ Logger: logger,
+ }
+
+ rdmaClient := rdma.NewClient(rdmaConfig)
+
+ // Connect to RDMA engine
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ logger.Info("🔗 Connecting to RDMA engine...")
+ if err := rdmaClient.Connect(ctx); err != nil {
+ return fmt.Errorf("failed to connect to RDMA engine: %w", err)
+ }
+ logger.Info("✅ Connected to RDMA engine successfully")
+
+ // Create HTTP server
+ sidecar := &Sidecar{
+ rdmaClient: rdmaClient,
+ logger: logger,
+ }
+
+ mux := http.NewServeMux()
+
+ // Health check endpoint
+ mux.HandleFunc("/health", sidecar.healthHandler)
+
+ // RDMA operations endpoints
+ mux.HandleFunc("/rdma/read", sidecar.rdmaReadHandler)
+ mux.HandleFunc("/rdma/capabilities", sidecar.capabilitiesHandler)
+ mux.HandleFunc("/rdma/ping", sidecar.pingHandler)
+
+ server := &http.Server{
+ Addr: fmt.Sprintf(":%d", port),
+ Handler: mux,
+ }
+
+ // Handle graceful shutdown
+ sigChan := make(chan os.Signal, 1)
+ signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
+
+ go func() {
+ logger.WithField("port", port).Info("🌐 HTTP server starting")
+ if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
+ logger.WithError(err).Fatal("HTTP server failed")
+ }
+ }()
+
+ // Wait for shutdown signal
+ <-sigChan
+ logger.Info("📡 Received shutdown signal, gracefully shutting down...")
+
+ // Shutdown HTTP server
+ shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer shutdownCancel()
+
+ if err := server.Shutdown(shutdownCtx); err != nil {
+ logger.WithError(err).Error("HTTP server shutdown failed")
+ } else {
+ logger.Info("🌐 HTTP server shutdown complete")
+ }
+
+ // Disconnect from RDMA engine
+ rdmaClient.Disconnect()
+ logger.Info("🛑 RDMA sidecar shutdown complete")
+
+ return nil
+}
+
+// Sidecar represents the main sidecar service
+type Sidecar struct {
+ rdmaClient *rdma.Client
+ logger *logrus.Logger
+}
+
+// Health check handler
+func (s *Sidecar) healthHandler(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
+ defer cancel()
+
+ // Test RDMA engine connectivity
+ if !s.rdmaClient.IsConnected() {
+ s.logger.Warn("⚠️ RDMA engine not connected")
+ http.Error(w, "RDMA engine not connected", http.StatusServiceUnavailable)
+ return
+ }
+
+ // Ping RDMA engine
+ latency, err := s.rdmaClient.Ping(ctx)
+ if err != nil {
+ s.logger.WithError(err).Error("❌ RDMA engine ping failed")
+ http.Error(w, "RDMA engine ping failed", http.StatusServiceUnavailable)
+ return
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ response := HealthResponse{
+ Status: "healthy",
+ RdmaEngineConnected: true,
+ RdmaEngineLatency: latency.String(),
+ Timestamp: time.Now().Format(time.RFC3339),
+ }
+ json.NewEncoder(w).Encode(response)
+}
+
+// RDMA capabilities handler
+func (s *Sidecar) capabilitiesHandler(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ caps := s.rdmaClient.GetCapabilities()
+ if caps == nil {
+ http.Error(w, "No capabilities available", http.StatusServiceUnavailable)
+ return
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ response := CapabilitiesResponse{
+ Version: caps.Version,
+ DeviceName: caps.DeviceName,
+ VendorId: caps.VendorId,
+ MaxSessions: uint32(caps.MaxSessions),
+ MaxTransferSize: caps.MaxTransferSize,
+ ActiveSessions: uint32(caps.ActiveSessions),
+ RealRdma: caps.RealRdma,
+ PortGid: caps.PortGid,
+ PortLid: caps.PortLid,
+ SupportedAuth: caps.SupportedAuth,
+ }
+ json.NewEncoder(w).Encode(response)
+}
+
+// RDMA ping handler
+func (s *Sidecar) pingHandler(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
+ defer cancel()
+
+ start := time.Now()
+ latency, err := s.rdmaClient.Ping(ctx)
+ totalLatency := time.Since(start)
+
+ if err != nil {
+ s.logger.WithError(err).Error("❌ RDMA ping failed")
+ http.Error(w, fmt.Sprintf("Ping failed: %v", err), http.StatusInternalServerError)
+ return
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ response := PingResponse{
+ Success: true,
+ EngineLatency: latency.String(),
+ TotalLatency: totalLatency.String(),
+ Timestamp: time.Now().Format(time.RFC3339),
+ }
+ json.NewEncoder(w).Encode(response)
+}
+
+// RDMA read handler - uses GET method with query parameters for RESTful read operations
+func (s *Sidecar) rdmaReadHandler(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ // Parse query parameters
+ query := r.URL.Query()
+
+ // Get file ID (e.g., "3,01637037d6") - this is the natural SeaweedFS identifier
+ fileID := query.Get("file_id")
+ if fileID == "" {
+ http.Error(w, "missing 'file_id' parameter", http.StatusBadRequest)
+ return
+ }
+
+ // Parse optional offset and size parameters
+ offset := uint64(0) // default value
+ if offsetStr := query.Get("offset"); offsetStr != "" {
+ val, err := strconv.ParseUint(offsetStr, 10, 64)
+ if err != nil {
+ http.Error(w, "invalid 'offset' parameter", http.StatusBadRequest)
+ return
+ }
+ offset = val
+ }
+
+ size := uint64(4096) // default value
+ if sizeStr := query.Get("size"); sizeStr != "" {
+ val, err := strconv.ParseUint(sizeStr, 10, 64)
+ if err != nil {
+ http.Error(w, "invalid 'size' parameter", http.StatusBadRequest)
+ return
+ }
+ size = val
+ }
+
+ s.logger.WithFields(logrus.Fields{
+ "file_id": fileID,
+ "offset": offset,
+ "size": size,
+ }).Info("📖 Processing RDMA read request")
+
+ ctx, cancel := context.WithTimeout(r.Context(), timeout)
+ defer cancel()
+
+ start := time.Now()
+ resp, err := s.rdmaClient.ReadFileRange(ctx, fileID, offset, size)
+ duration := time.Since(start)
+
+ if err != nil {
+ s.logger.WithError(err).Error("❌ RDMA read failed")
+ http.Error(w, fmt.Sprintf("RDMA read failed: %v", err), http.StatusInternalServerError)
+ return
+ }
+
+ s.logger.WithFields(logrus.Fields{
+ "file_id": fileID,
+ "bytes_read": resp.BytesRead,
+ "duration": duration,
+ "transfer_rate": resp.TransferRate,
+ "session_id": resp.SessionID,
+ }).Info("✅ RDMA read completed successfully")
+
+ // Set response headers
+ w.Header().Set("Content-Type", "application/octet-stream")
+ w.Header().Set("X-RDMA-Session-ID", resp.SessionID)
+ w.Header().Set("X-RDMA-Duration", duration.String())
+ w.Header().Set("X-RDMA-Transfer-Rate", fmt.Sprintf("%.2f", resp.TransferRate))
+ w.Header().Set("X-RDMA-Bytes-Read", fmt.Sprintf("%d", resp.BytesRead))
+
+ // Write the data
+ w.Write(resp.Data)
+}