aboutsummaryrefslogtreecommitdiff
path: root/seaweedfs-rdma-sidecar/pkg/rdma/client.go
blob: 156bb54971bcc20adc7521f3cf7af5577342707d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
// Package rdma provides high-level RDMA operations for SeaweedFS integration
package rdma

import (
	"context"
	"fmt"
	"sync"
	"time"

	"seaweedfs-rdma-sidecar/pkg/ipc"

	"github.com/seaweedfs/seaweedfs/weed/storage/needle"
	"github.com/sirupsen/logrus"
)

// PooledConnection represents a pooled RDMA connection
type PooledConnection struct {
	ipcClient *ipc.Client
	lastUsed  time.Time
	inUse     bool
	sessionID string
	created   time.Time
}

// ConnectionPool manages a pool of RDMA connections
type ConnectionPool struct {
	connections    []*PooledConnection
	mutex          sync.RWMutex
	maxConnections int
	maxIdleTime    time.Duration
	enginePath     string
	logger         *logrus.Logger
}

// Client provides high-level RDMA operations with connection pooling
type Client struct {
	pool           *ConnectionPool
	logger         *logrus.Logger
	enginePath     string
	capabilities   *ipc.GetCapabilitiesResponse
	connected      bool
	defaultTimeout time.Duration

	// Legacy single connection (for backward compatibility)
	ipcClient *ipc.Client
}

// Config holds configuration for the RDMA client
type Config struct {
	EngineSocketPath string
	DefaultTimeout   time.Duration
	Logger           *logrus.Logger

	// Connection pooling options
	EnablePooling  bool          // Enable connection pooling (default: true)
	MaxConnections int           // Max connections in pool (default: 10)
	MaxIdleTime    time.Duration // Max idle time before connection cleanup (default: 5min)
}

// ReadRequest represents a SeaweedFS needle read request
type ReadRequest struct {
	VolumeID  uint32
	NeedleID  uint64
	Cookie    uint32
	Offset    uint64
	Size      uint64
	AuthToken *string
}

// ReadResponse represents the result of an RDMA read operation
type ReadResponse struct {
	Data         []byte
	BytesRead    uint64
	Duration     time.Duration
	TransferRate float64
	SessionID    string
	Success      bool
	Message      string
}

// NewConnectionPool creates a new connection pool
func NewConnectionPool(enginePath string, maxConnections int, maxIdleTime time.Duration, logger *logrus.Logger) *ConnectionPool {
	if maxConnections <= 0 {
		maxConnections = 10 // Default
	}
	if maxIdleTime <= 0 {
		maxIdleTime = 5 * time.Minute // Default
	}

	return &ConnectionPool{
		connections:    make([]*PooledConnection, 0, maxConnections),
		maxConnections: maxConnections,
		maxIdleTime:    maxIdleTime,
		enginePath:     enginePath,
		logger:         logger,
	}
}

// getConnection gets an available connection from the pool or creates a new one
func (p *ConnectionPool) getConnection(ctx context.Context) (*PooledConnection, error) {
	p.mutex.Lock()
	defer p.mutex.Unlock()

	// Look for an available connection
	for _, conn := range p.connections {
		if !conn.inUse && time.Since(conn.lastUsed) < p.maxIdleTime {
			conn.inUse = true
			conn.lastUsed = time.Now()
			p.logger.WithField("session_id", conn.sessionID).Debug("🔌 Reusing pooled RDMA connection")
			return conn, nil
		}
	}

	// Create new connection if under limit
	if len(p.connections) < p.maxConnections {
		ipcClient := ipc.NewClient(p.enginePath, p.logger)
		if err := ipcClient.Connect(ctx); err != nil {
			return nil, fmt.Errorf("failed to create new pooled connection: %w", err)
		}

		conn := &PooledConnection{
			ipcClient: ipcClient,
			lastUsed:  time.Now(),
			inUse:     true,
			sessionID: fmt.Sprintf("pool-%d-%d", len(p.connections), time.Now().Unix()),
			created:   time.Now(),
		}

		p.connections = append(p.connections, conn)
		p.logger.WithFields(logrus.Fields{
			"session_id": conn.sessionID,
			"pool_size":  len(p.connections),
		}).Info("🚀 Created new pooled RDMA connection")

		return conn, nil
	}

	// Pool is full, wait for an available connection
	return nil, fmt.Errorf("connection pool exhausted (max: %d)", p.maxConnections)
}

// releaseConnection returns a connection to the pool
func (p *ConnectionPool) releaseConnection(conn *PooledConnection) {
	p.mutex.Lock()
	defer p.mutex.Unlock()

	conn.inUse = false
	conn.lastUsed = time.Now()

	p.logger.WithField("session_id", conn.sessionID).Debug("🔄 Released RDMA connection back to pool")
}

// cleanup removes idle connections from the pool
func (p *ConnectionPool) cleanup() {
	p.mutex.Lock()
	defer p.mutex.Unlock()

	now := time.Now()
	activeConnections := make([]*PooledConnection, 0, len(p.connections))

	for _, conn := range p.connections {
		if conn.inUse || now.Sub(conn.lastUsed) < p.maxIdleTime {
			activeConnections = append(activeConnections, conn)
		} else {
			// Close idle connection
			conn.ipcClient.Disconnect()
			p.logger.WithFields(logrus.Fields{
				"session_id": conn.sessionID,
				"idle_time":  now.Sub(conn.lastUsed),
			}).Debug("🧹 Cleaned up idle RDMA connection")
		}
	}

	p.connections = activeConnections
}

// Close closes all connections in the pool
func (p *ConnectionPool) Close() {
	p.mutex.Lock()
	defer p.mutex.Unlock()

	for _, conn := range p.connections {
		conn.ipcClient.Disconnect()
	}
	p.connections = nil
	p.logger.Info("🔌 Connection pool closed")
}

// NewClient creates a new RDMA client
func NewClient(config *Config) *Client {
	if config.Logger == nil {
		config.Logger = logrus.New()
		config.Logger.SetLevel(logrus.InfoLevel)
	}

	if config.DefaultTimeout == 0 {
		config.DefaultTimeout = 30 * time.Second
	}

	client := &Client{
		logger:         config.Logger,
		enginePath:     config.EngineSocketPath,
		defaultTimeout: config.DefaultTimeout,
	}

	// Initialize connection pooling if enabled (default: true)
	enablePooling := config.EnablePooling
	if config.MaxConnections == 0 && config.MaxIdleTime == 0 {
		// Default to enabled if not explicitly configured
		enablePooling = true
	}

	if enablePooling {
		client.pool = NewConnectionPool(
			config.EngineSocketPath,
			config.MaxConnections,
			config.MaxIdleTime,
			config.Logger,
		)

		// Start cleanup goroutine
		go client.startCleanupRoutine()

		config.Logger.WithFields(logrus.Fields{
			"max_connections": client.pool.maxConnections,
			"max_idle_time":   client.pool.maxIdleTime,
		}).Info("🔌 RDMA connection pooling enabled")
	} else {
		// Legacy single connection mode
		client.ipcClient = ipc.NewClient(config.EngineSocketPath, config.Logger)
		config.Logger.Info("🔌 RDMA single connection mode (pooling disabled)")
	}

	return client
}

// startCleanupRoutine starts a background goroutine to clean up idle connections
func (c *Client) startCleanupRoutine() {
	ticker := time.NewTicker(1 * time.Minute) // Cleanup every minute
	go func() {
		defer ticker.Stop()
		for range ticker.C {
			if c.pool != nil {
				c.pool.cleanup()
			}
		}
	}()
}

// Connect establishes connection to the Rust RDMA engine and queries capabilities
func (c *Client) Connect(ctx context.Context) error {
	c.logger.Info("🚀 Connecting to RDMA engine")

	if c.pool != nil {
		// Connection pooling mode - connections are created on-demand
		c.connected = true
		c.logger.Info("✅ RDMA client ready (connection pooling enabled)")
		return nil
	}

	// Single connection mode
	if err := c.ipcClient.Connect(ctx); err != nil {
		return fmt.Errorf("failed to connect to IPC: %w", err)
	}

	// Test connectivity with ping
	clientID := "rdma-client"
	pong, err := c.ipcClient.Ping(ctx, &clientID)
	if err != nil {
		c.ipcClient.Disconnect()
		return fmt.Errorf("failed to ping RDMA engine: %w", err)
	}

	latency := time.Duration(pong.ServerRttNs)
	c.logger.WithFields(logrus.Fields{
		"latency":    latency,
		"server_rtt": time.Duration(pong.ServerRttNs),
	}).Info("📡 RDMA engine ping successful")

	// Get capabilities
	caps, err := c.ipcClient.GetCapabilities(ctx, &clientID)
	if err != nil {
		c.ipcClient.Disconnect()
		return fmt.Errorf("failed to get engine capabilities: %w", err)
	}

	c.capabilities = caps
	c.connected = true

	c.logger.WithFields(logrus.Fields{
		"version":           caps.Version,
		"device_name":       caps.DeviceName,
		"vendor_id":         caps.VendorId,
		"max_sessions":      caps.MaxSessions,
		"max_transfer_size": caps.MaxTransferSize,
		"active_sessions":   caps.ActiveSessions,
		"real_rdma":         caps.RealRdma,
		"port_gid":          caps.PortGid,
		"port_lid":          caps.PortLid,
	}).Info("✅ RDMA engine connected and ready")

	return nil
}

// Disconnect closes the connection to the RDMA engine
func (c *Client) Disconnect() {
	if c.connected {
		if c.pool != nil {
			// Connection pooling mode
			c.pool.Close()
			c.logger.Info("🔌 Disconnected from RDMA engine (pool closed)")
		} else {
			// Single connection mode
			c.ipcClient.Disconnect()
			c.logger.Info("🔌 Disconnected from RDMA engine")
		}
		c.connected = false
	}
}

// IsConnected returns true if connected to the RDMA engine
func (c *Client) IsConnected() bool {
	if c.pool != nil {
		// Connection pooling mode - always connected if pool exists
		return c.connected
	} else {
		// Single connection mode
		return c.connected && c.ipcClient.IsConnected()
	}
}

// GetCapabilities returns the RDMA engine capabilities
func (c *Client) GetCapabilities() *ipc.GetCapabilitiesResponse {
	return c.capabilities
}

// Read performs an RDMA read operation for a SeaweedFS needle
func (c *Client) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) {
	if !c.IsConnected() {
		return nil, fmt.Errorf("not connected to RDMA engine")
	}

	startTime := time.Now()

	c.logger.WithFields(logrus.Fields{
		"volume_id": req.VolumeID,
		"needle_id": req.NeedleID,
		"offset":    req.Offset,
		"size":      req.Size,
	}).Debug("📖 Starting RDMA read operation")

	if c.pool != nil {
		// Connection pooling mode
		return c.readWithPool(ctx, req, startTime)
	}

	// Single connection mode
	// Create IPC request
	ipcReq := &ipc.StartReadRequest{
		VolumeID:    req.VolumeID,
		NeedleID:    req.NeedleID,
		Cookie:      req.Cookie,
		Offset:      req.Offset,
		Size:        req.Size,
		RemoteAddr:  0, // Will be set by engine (mock for now)
		RemoteKey:   0, // Will be set by engine (mock for now)
		TimeoutSecs: uint64(c.defaultTimeout.Seconds()),
		AuthToken:   req.AuthToken,
	}

	// Start RDMA read
	startResp, err := c.ipcClient.StartRead(ctx, ipcReq)
	if err != nil {
		c.logger.WithError(err).Error("❌ Failed to start RDMA read")
		return nil, fmt.Errorf("failed to start RDMA read: %w", err)
	}

	// In the new protocol, if we got a StartReadResponse, the operation was successful

	c.logger.WithFields(logrus.Fields{
		"session_id":    startResp.SessionID,
		"local_addr":    fmt.Sprintf("0x%x", startResp.LocalAddr),
		"local_key":     startResp.LocalKey,
		"transfer_size": startResp.TransferSize,
		"expected_crc":  fmt.Sprintf("0x%x", startResp.ExpectedCrc),
		"expires_at":    time.Unix(0, int64(startResp.ExpiresAtNs)).Format(time.RFC3339),
	}).Debug("📖 RDMA read session started")

	// Complete the RDMA read
	completeResp, err := c.ipcClient.CompleteRead(ctx, startResp.SessionID, true, startResp.TransferSize, &startResp.ExpectedCrc)
	if err != nil {
		c.logger.WithError(err).Error("❌ Failed to complete RDMA read")
		return nil, fmt.Errorf("failed to complete RDMA read: %w", err)
	}

	duration := time.Since(startTime)

	if !completeResp.Success {
		errorMsg := "unknown error"
		if completeResp.Message != nil {
			errorMsg = *completeResp.Message
		}
		c.logger.WithFields(logrus.Fields{
			"session_id":    startResp.SessionID,
			"error_message": errorMsg,
		}).Error("❌ RDMA read completion failed")
		return nil, fmt.Errorf("RDMA read completion failed: %s", errorMsg)
	}

	// Calculate transfer rate (bytes/second)
	transferRate := float64(startResp.TransferSize) / duration.Seconds()

	c.logger.WithFields(logrus.Fields{
		"session_id":    startResp.SessionID,
		"bytes_read":    startResp.TransferSize,
		"duration":      duration,
		"transfer_rate": transferRate,
		"server_crc":    completeResp.ServerCrc,
	}).Info("✅ RDMA read completed successfully")

	// MOCK DATA IMPLEMENTATION - FOR DEVELOPMENT/TESTING ONLY
	// 
	// This section generates placeholder data for the mock RDMA implementation.
	// In a production RDMA implementation, this should be replaced with:
	//
	// 1. The actual data transferred via RDMA from the remote memory region
	// 2. Data validation using checksums/CRC from the RDMA completion
	// 3. Proper error handling for RDMA transfer failures
	// 4. Memory region cleanup and deregistration
	//
	// TODO for real RDMA implementation:
	// - Replace mockData with actual RDMA buffer contents
	// - Validate data integrity using server CRC: completeResp.ServerCrc
	// - Handle partial transfers and retry logic
	// - Implement proper memory management for RDMA regions
	//
	// Current mock behavior: Generates a simple pattern (0,1,2...255,0,1,2...)
	// This allows testing of the integration pipeline without real hardware
	mockData := make([]byte, startResp.TransferSize)
	for i := range mockData {
		mockData[i] = byte(i % 256) // Simple repeating pattern for verification
	}
	// END MOCK DATA IMPLEMENTATION

	return &ReadResponse{
		Data:         mockData,
		BytesRead:    startResp.TransferSize,
		Duration:     duration,
		TransferRate: transferRate,
		SessionID:    startResp.SessionID,
		Success:      true,
		Message:      "RDMA read completed successfully",
	}, nil
}

// ReadRange performs an RDMA read for a specific range within a needle
func (c *Client) ReadRange(ctx context.Context, volumeID uint32, needleID uint64, cookie uint32, offset, size uint64) (*ReadResponse, error) {
	req := &ReadRequest{
		VolumeID: volumeID,
		NeedleID: needleID,
		Cookie:   cookie,
		Offset:   offset,
		Size:     size,
	}
	return c.Read(ctx, req)
}

// ReadFileRange performs an RDMA read using SeaweedFS file ID format
func (c *Client) ReadFileRange(ctx context.Context, fileID string, offset, size uint64) (*ReadResponse, error) {
	// Parse file ID (e.g., "3,01637037d6" -> volume=3, needle=0x01637037d6, cookie extracted)
	volumeID, needleID, cookie, err := parseFileID(fileID)
	if err != nil {
		return nil, fmt.Errorf("invalid file ID %s: %w", fileID, err)
	}
	
	req := &ReadRequest{
		VolumeID: volumeID,
		NeedleID: needleID,
		Cookie:   cookie,
		Offset:   offset,
		Size:     size,
	}
	return c.Read(ctx, req)
}

// parseFileID extracts volume ID, needle ID, and cookie from a SeaweedFS file ID
// Uses existing SeaweedFS parsing logic to ensure compatibility
func parseFileID(fileId string) (volumeID uint32, needleID uint64, cookie uint32, err error) {
	// Use existing SeaweedFS file ID parsing
	fid, err := needle.ParseFileIdFromString(fileId)
	if err != nil {
		return 0, 0, 0, fmt.Errorf("failed to parse file ID %s: %w", fileId, err)
	}

	volumeID = uint32(fid.VolumeId)
	needleID = uint64(fid.Key)
	cookie = uint32(fid.Cookie)

	return volumeID, needleID, cookie, nil
}

// ReadFull performs an RDMA read for an entire needle
func (c *Client) ReadFull(ctx context.Context, volumeID uint32, needleID uint64, cookie uint32) (*ReadResponse, error) {
	req := &ReadRequest{
		VolumeID: volumeID,
		NeedleID: needleID,
		Cookie:   cookie,
		Offset:   0,
		Size:     0, // 0 means read entire needle
	}
	return c.Read(ctx, req)
}

// Ping tests connectivity to the RDMA engine
func (c *Client) Ping(ctx context.Context) (time.Duration, error) {
	if !c.IsConnected() {
		return 0, fmt.Errorf("not connected to RDMA engine")
	}

	clientID := "health-check"
	start := time.Now()
	pong, err := c.ipcClient.Ping(ctx, &clientID)
	if err != nil {
		return 0, err
	}

	totalLatency := time.Since(start)
	serverRtt := time.Duration(pong.ServerRttNs)

	c.logger.WithFields(logrus.Fields{
		"total_latency": totalLatency,
		"server_rtt":    serverRtt,
		"client_id":     clientID,
	}).Debug("🏓 RDMA engine ping successful")

	return totalLatency, nil
}

// readWithPool performs RDMA read using connection pooling
func (c *Client) readWithPool(ctx context.Context, req *ReadRequest, startTime time.Time) (*ReadResponse, error) {
	// Get connection from pool
	conn, err := c.pool.getConnection(ctx)
	if err != nil {
		return nil, fmt.Errorf("failed to get pooled connection: %w", err)
	}
	defer c.pool.releaseConnection(conn)

	c.logger.WithField("session_id", conn.sessionID).Debug("🔌 Using pooled RDMA connection")

	// Create IPC request
	ipcReq := &ipc.StartReadRequest{
		VolumeID:    req.VolumeID,
		NeedleID:    req.NeedleID,
		Cookie:      req.Cookie,
		Offset:      req.Offset,
		Size:        req.Size,
		RemoteAddr:  0, // Will be set by engine (mock for now)
		RemoteKey:   0, // Will be set by engine (mock for now)
		TimeoutSecs: uint64(c.defaultTimeout.Seconds()),
		AuthToken:   req.AuthToken,
	}

	// Start RDMA read
	startResp, err := conn.ipcClient.StartRead(ctx, ipcReq)
	if err != nil {
		c.logger.WithError(err).Error("❌ Failed to start RDMA read (pooled)")
		return nil, fmt.Errorf("failed to start RDMA read: %w", err)
	}

	c.logger.WithFields(logrus.Fields{
		"session_id":    startResp.SessionID,
		"local_addr":    fmt.Sprintf("0x%x", startResp.LocalAddr),
		"local_key":     startResp.LocalKey,
		"transfer_size": startResp.TransferSize,
		"expected_crc":  fmt.Sprintf("0x%x", startResp.ExpectedCrc),
		"expires_at":    time.Unix(0, int64(startResp.ExpiresAtNs)).Format(time.RFC3339),
		"pooled":        true,
	}).Debug("📖 RDMA read session started (pooled)")

	// Complete the RDMA read
	completeResp, err := conn.ipcClient.CompleteRead(ctx, startResp.SessionID, true, startResp.TransferSize, &startResp.ExpectedCrc)
	if err != nil {
		c.logger.WithError(err).Error("❌ Failed to complete RDMA read (pooled)")
		return nil, fmt.Errorf("failed to complete RDMA read: %w", err)
	}

	duration := time.Since(startTime)

	if !completeResp.Success {
		errorMsg := "unknown error"
		if completeResp.Message != nil {
			errorMsg = *completeResp.Message
		}
		c.logger.WithFields(logrus.Fields{
			"session_id":    conn.sessionID,
			"error_message": errorMsg,
			"pooled":        true,
		}).Error("❌ RDMA read completion failed (pooled)")
		return nil, fmt.Errorf("RDMA read completion failed: %s", errorMsg)
	}

	// Calculate transfer rate (bytes/second)
	transferRate := float64(startResp.TransferSize) / duration.Seconds()

	c.logger.WithFields(logrus.Fields{
		"session_id":    conn.sessionID,
		"bytes_read":    startResp.TransferSize,
		"duration":      duration,
		"transfer_rate": transferRate,
		"server_crc":    completeResp.ServerCrc,
		"pooled":        true,
	}).Info("✅ RDMA read completed successfully (pooled)")

	// For the mock implementation, we'll return placeholder data
	// In the real implementation, this would be the actual RDMA transferred data
	mockData := make([]byte, startResp.TransferSize)
	for i := range mockData {
		mockData[i] = byte(i % 256) // Simple pattern for testing
	}

	return &ReadResponse{
		Data:         mockData,
		BytesRead:    startResp.TransferSize,
		Duration:     duration,
		TransferRate: transferRate,
		SessionID:    conn.sessionID,
		Success:      true,
		Message:      "RDMA read successful (pooled)",
	}, nil
}