aboutsummaryrefslogtreecommitdiff
path: root/seaweedfs-rdma-sidecar/pkg/ipc/client.go
diff options
context:
space:
mode:
Diffstat (limited to 'seaweedfs-rdma-sidecar/pkg/ipc/client.go')
-rw-r--r--seaweedfs-rdma-sidecar/pkg/ipc/client.go331
1 files changed, 331 insertions, 0 deletions
diff --git a/seaweedfs-rdma-sidecar/pkg/ipc/client.go b/seaweedfs-rdma-sidecar/pkg/ipc/client.go
new file mode 100644
index 000000000..b2c1d2db1
--- /dev/null
+++ b/seaweedfs-rdma-sidecar/pkg/ipc/client.go
@@ -0,0 +1,331 @@
+package ipc
+
+import (
+ "context"
+ "encoding/binary"
+ "fmt"
+ "net"
+ "sync"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "github.com/vmihailenco/msgpack/v5"
+)
+
+// Client provides IPC communication with the Rust RDMA engine
+type Client struct {
+ socketPath string
+ conn net.Conn
+ mu sync.RWMutex
+ logger *logrus.Logger
+ connected bool
+}
+
+// NewClient creates a new IPC client
+func NewClient(socketPath string, logger *logrus.Logger) *Client {
+ if logger == nil {
+ logger = logrus.New()
+ logger.SetLevel(logrus.InfoLevel)
+ }
+
+ return &Client{
+ socketPath: socketPath,
+ logger: logger,
+ }
+}
+
+// Connect establishes connection to the Rust RDMA engine
+func (c *Client) Connect(ctx context.Context) error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if c.connected {
+ return nil
+ }
+
+ c.logger.WithField("socket", c.socketPath).Info("🔗 Connecting to Rust RDMA engine")
+
+ dialer := &net.Dialer{}
+ conn, err := dialer.DialContext(ctx, "unix", c.socketPath)
+ if err != nil {
+ c.logger.WithError(err).Error("❌ Failed to connect to RDMA engine")
+ return fmt.Errorf("failed to connect to RDMA engine at %s: %w", c.socketPath, err)
+ }
+
+ c.conn = conn
+ c.connected = true
+ c.logger.Info("✅ Connected to Rust RDMA engine")
+
+ return nil
+}
+
+// Disconnect closes the connection
+func (c *Client) Disconnect() {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if c.conn != nil {
+ c.conn.Close()
+ c.conn = nil
+ c.connected = false
+ c.logger.Info("🔌 Disconnected from Rust RDMA engine")
+ }
+}
+
+// IsConnected returns connection status
+func (c *Client) IsConnected() bool {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ return c.connected
+}
+
+// SendMessage sends an IPC message and waits for response
+func (c *Client) SendMessage(ctx context.Context, msg *IpcMessage) (*IpcMessage, error) {
+ c.mu.RLock()
+ conn := c.conn
+ connected := c.connected
+ c.mu.RUnlock()
+
+ if !connected || conn == nil {
+ return nil, fmt.Errorf("not connected to RDMA engine")
+ }
+
+ // Set write timeout
+ if deadline, ok := ctx.Deadline(); ok {
+ conn.SetWriteDeadline(deadline)
+ } else {
+ conn.SetWriteDeadline(time.Now().Add(30 * time.Second))
+ }
+
+ c.logger.WithField("type", msg.Type).Debug("📤 Sending message to Rust engine")
+
+ // Serialize message with MessagePack
+ data, err := msgpack.Marshal(msg)
+ if err != nil {
+ c.logger.WithError(err).Error("❌ Failed to marshal message")
+ return nil, fmt.Errorf("failed to marshal message: %w", err)
+ }
+
+ // Send message length (4 bytes) + message data
+ lengthBytes := make([]byte, 4)
+ binary.LittleEndian.PutUint32(lengthBytes, uint32(len(data)))
+
+ if _, err := conn.Write(lengthBytes); err != nil {
+ c.logger.WithError(err).Error("❌ Failed to send message length")
+ return nil, fmt.Errorf("failed to send message length: %w", err)
+ }
+
+ if _, err := conn.Write(data); err != nil {
+ c.logger.WithError(err).Error("❌ Failed to send message data")
+ return nil, fmt.Errorf("failed to send message data: %w", err)
+ }
+
+ c.logger.WithFields(logrus.Fields{
+ "type": msg.Type,
+ "size": len(data),
+ }).Debug("📤 Message sent successfully")
+
+ // Read response
+ return c.readResponse(ctx, conn)
+}
+
+// readResponse reads and deserializes the response message
+func (c *Client) readResponse(ctx context.Context, conn net.Conn) (*IpcMessage, error) {
+ // Set read timeout
+ if deadline, ok := ctx.Deadline(); ok {
+ conn.SetReadDeadline(deadline)
+ } else {
+ conn.SetReadDeadline(time.Now().Add(30 * time.Second))
+ }
+
+ // Read message length (4 bytes)
+ lengthBytes := make([]byte, 4)
+ if _, err := conn.Read(lengthBytes); err != nil {
+ c.logger.WithError(err).Error("❌ Failed to read response length")
+ return nil, fmt.Errorf("failed to read response length: %w", err)
+ }
+
+ length := binary.LittleEndian.Uint32(lengthBytes)
+ if length > 64*1024*1024 { // 64MB sanity check
+ c.logger.WithField("length", length).Error("❌ Response message too large")
+ return nil, fmt.Errorf("response message too large: %d bytes", length)
+ }
+
+ // Read message data
+ data := make([]byte, length)
+ if _, err := conn.Read(data); err != nil {
+ c.logger.WithError(err).Error("❌ Failed to read response data")
+ return nil, fmt.Errorf("failed to read response data: %w", err)
+ }
+
+ c.logger.WithField("size", length).Debug("📥 Response received")
+
+ // Deserialize with MessagePack
+ var response IpcMessage
+ if err := msgpack.Unmarshal(data, &response); err != nil {
+ c.logger.WithError(err).Error("❌ Failed to unmarshal response")
+ return nil, fmt.Errorf("failed to unmarshal response: %w", err)
+ }
+
+ c.logger.WithField("type", response.Type).Debug("📥 Response deserialized successfully")
+
+ return &response, nil
+}
+
+// High-level convenience methods
+
+// Ping sends a ping message to test connectivity
+func (c *Client) Ping(ctx context.Context, clientID *string) (*PongResponse, error) {
+ msg := NewPingMessage(clientID)
+
+ response, err := c.SendMessage(ctx, msg)
+ if err != nil {
+ return nil, err
+ }
+
+ if response.Type == MsgError {
+ errorData, err := msgpack.Marshal(response.Data)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal engine error data: %w", err)
+ }
+ var errorResp ErrorResponse
+ if err := msgpack.Unmarshal(errorData, &errorResp); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal engine error response: %w", err)
+ }
+ return nil, fmt.Errorf("engine error: %s - %s", errorResp.Code, errorResp.Message)
+ }
+
+ if response.Type != MsgPong {
+ return nil, fmt.Errorf("unexpected response type: %s", response.Type)
+ }
+
+ // Convert response data to PongResponse
+ pongData, err := msgpack.Marshal(response.Data)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal pong data: %w", err)
+ }
+
+ var pong PongResponse
+ if err := msgpack.Unmarshal(pongData, &pong); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal pong response: %w", err)
+ }
+
+ return &pong, nil
+}
+
+// GetCapabilities requests engine capabilities
+func (c *Client) GetCapabilities(ctx context.Context, clientID *string) (*GetCapabilitiesResponse, error) {
+ msg := NewGetCapabilitiesMessage(clientID)
+
+ response, err := c.SendMessage(ctx, msg)
+ if err != nil {
+ return nil, err
+ }
+
+ if response.Type == MsgError {
+ errorData, err := msgpack.Marshal(response.Data)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal engine error data: %w", err)
+ }
+ var errorResp ErrorResponse
+ if err := msgpack.Unmarshal(errorData, &errorResp); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal engine error response: %w", err)
+ }
+ return nil, fmt.Errorf("engine error: %s - %s", errorResp.Code, errorResp.Message)
+ }
+
+ if response.Type != MsgGetCapabilitiesResponse {
+ return nil, fmt.Errorf("unexpected response type: %s", response.Type)
+ }
+
+ // Convert response data to GetCapabilitiesResponse
+ capsData, err := msgpack.Marshal(response.Data)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal capabilities data: %w", err)
+ }
+
+ var caps GetCapabilitiesResponse
+ if err := msgpack.Unmarshal(capsData, &caps); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal capabilities response: %w", err)
+ }
+
+ return &caps, nil
+}
+
+// StartRead initiates an RDMA read operation
+func (c *Client) StartRead(ctx context.Context, req *StartReadRequest) (*StartReadResponse, error) {
+ msg := NewStartReadMessage(req)
+
+ response, err := c.SendMessage(ctx, msg)
+ if err != nil {
+ return nil, err
+ }
+
+ if response.Type == MsgError {
+ errorData, err := msgpack.Marshal(response.Data)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal engine error data: %w", err)
+ }
+ var errorResp ErrorResponse
+ if err := msgpack.Unmarshal(errorData, &errorResp); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal engine error response: %w", err)
+ }
+ return nil, fmt.Errorf("engine error: %s - %s", errorResp.Code, errorResp.Message)
+ }
+
+ if response.Type != MsgStartReadResponse {
+ return nil, fmt.Errorf("unexpected response type: %s", response.Type)
+ }
+
+ // Convert response data to StartReadResponse
+ startData, err := msgpack.Marshal(response.Data)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal start read data: %w", err)
+ }
+
+ var startResp StartReadResponse
+ if err := msgpack.Unmarshal(startData, &startResp); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal start read response: %w", err)
+ }
+
+ return &startResp, nil
+}
+
+// CompleteRead completes an RDMA read operation
+func (c *Client) CompleteRead(ctx context.Context, sessionID string, success bool, bytesTransferred uint64, clientCrc *uint32) (*CompleteReadResponse, error) {
+ msg := NewCompleteReadMessage(sessionID, success, bytesTransferred, clientCrc, nil)
+
+ response, err := c.SendMessage(ctx, msg)
+ if err != nil {
+ return nil, err
+ }
+
+ if response.Type == MsgError {
+ errorData, err := msgpack.Marshal(response.Data)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal engine error data: %w", err)
+ }
+ var errorResp ErrorResponse
+ if err := msgpack.Unmarshal(errorData, &errorResp); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal engine error response: %w", err)
+ }
+ return nil, fmt.Errorf("engine error: %s - %s", errorResp.Code, errorResp.Message)
+ }
+
+ if response.Type != MsgCompleteReadResponse {
+ return nil, fmt.Errorf("unexpected response type: %s", response.Type)
+ }
+
+ // Convert response data to CompleteReadResponse
+ completeData, err := msgpack.Marshal(response.Data)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal complete read data: %w", err)
+ }
+
+ var completeResp CompleteReadResponse
+ if err := msgpack.Unmarshal(completeData, &completeResp); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal complete read response: %w", err)
+ }
+
+ return &completeResp, nil
+}