diff options
Diffstat (limited to 'weed/mount/rdma_client.go')
| -rw-r--r-- | weed/mount/rdma_client.go | 379 |
1 files changed, 379 insertions, 0 deletions
diff --git a/weed/mount/rdma_client.go b/weed/mount/rdma_client.go new file mode 100644 index 000000000..19fa5b5bc --- /dev/null +++ b/weed/mount/rdma_client.go @@ -0,0 +1,379 @@ +package mount + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "os" + "strings" + "sync/atomic" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/wdclient" +) + +// RDMAMountClient provides RDMA acceleration for SeaweedFS mount operations +type RDMAMountClient struct { + sidecarAddr string + httpClient *http.Client + maxConcurrent int + timeout time.Duration + semaphore chan struct{} + + // Volume lookup + lookupFileIdFn wdclient.LookupFileIdFunctionType + + // Statistics + totalRequests int64 + successfulReads int64 + failedReads int64 + totalBytesRead int64 + totalLatencyNs int64 +} + +// RDMAReadRequest represents a request to read data via RDMA +type RDMAReadRequest struct { + VolumeID uint32 `json:"volume_id"` + NeedleID uint64 `json:"needle_id"` + Cookie uint32 `json:"cookie"` + Offset uint64 `json:"offset"` + Size uint64 `json:"size"` +} + +// RDMAReadResponse represents the response from an RDMA read operation +type RDMAReadResponse struct { + Success bool `json:"success"` + IsRDMA bool `json:"is_rdma"` + Source string `json:"source"` + Duration string `json:"duration"` + DataSize int `json:"data_size"` + SessionID string `json:"session_id,omitempty"` + ErrorMsg string `json:"error,omitempty"` + + // Zero-copy optimization fields + UseTempFile bool `json:"use_temp_file"` + TempFile string `json:"temp_file"` +} + +// RDMAHealthResponse represents the health status of the RDMA sidecar +type RDMAHealthResponse struct { + Status string `json:"status"` + RDMA struct { + Enabled bool `json:"enabled"` + Connected bool `json:"connected"` + } `json:"rdma"` + Timestamp string `json:"timestamp"` +} + +// NewRDMAMountClient creates a new RDMA client for mount operations +func NewRDMAMountClient(sidecarAddr string, lookupFileIdFn wdclient.LookupFileIdFunctionType, maxConcurrent int, timeoutMs int) (*RDMAMountClient, error) { + client := &RDMAMountClient{ + sidecarAddr: sidecarAddr, + maxConcurrent: maxConcurrent, + timeout: time.Duration(timeoutMs) * time.Millisecond, + httpClient: &http.Client{ + Timeout: time.Duration(timeoutMs) * time.Millisecond, + }, + semaphore: make(chan struct{}, maxConcurrent), + lookupFileIdFn: lookupFileIdFn, + } + + // Test connectivity and RDMA availability + if err := client.healthCheck(); err != nil { + return nil, fmt.Errorf("RDMA sidecar health check failed: %w", err) + } + + glog.Infof("RDMA mount client initialized: sidecar=%s, maxConcurrent=%d, timeout=%v", + sidecarAddr, maxConcurrent, client.timeout) + + return client, nil +} + +// lookupVolumeLocationByFileID finds the best volume server for a given file ID +func (c *RDMAMountClient) lookupVolumeLocationByFileID(ctx context.Context, fileID string) (string, error) { + glog.V(4).Infof("Looking up volume location for file ID %s", fileID) + + targetUrls, err := c.lookupFileIdFn(ctx, fileID) + if err != nil { + return "", fmt.Errorf("failed to lookup volume for file %s: %w", fileID, err) + } + + if len(targetUrls) == 0 { + return "", fmt.Errorf("no locations found for file %s", fileID) + } + + // Choose the first URL and extract the server address + targetUrl := targetUrls[0] + // Extract server address from URL like "http://server:port/fileId" + parts := strings.Split(targetUrl, "/") + if len(parts) < 3 { + return "", fmt.Errorf("invalid target URL format: %s", targetUrl) + } + bestAddress := fmt.Sprintf("http://%s", parts[2]) + + glog.V(4).Infof("File %s located at %s", fileID, bestAddress) + return bestAddress, nil +} + +// lookupVolumeLocation finds the best volume server for a given volume ID (legacy method) +func (c *RDMAMountClient) lookupVolumeLocation(ctx context.Context, volumeID uint32, needleID uint64, cookie uint32) (string, error) { + // Create a file ID for lookup (format: volumeId,needleId,cookie) + fileID := fmt.Sprintf("%d,%x,%d", volumeID, needleID, cookie) + return c.lookupVolumeLocationByFileID(ctx, fileID) +} + +// healthCheck verifies that the RDMA sidecar is available and functioning +func (c *RDMAMountClient) healthCheck() error { + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, "GET", + fmt.Sprintf("http://%s/health", c.sidecarAddr), nil) + if err != nil { + return fmt.Errorf("failed to create health check request: %w", err) + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("health check request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("health check failed with status: %s", resp.Status) + } + + // Parse health response + var health RDMAHealthResponse + if err := json.NewDecoder(resp.Body).Decode(&health); err != nil { + return fmt.Errorf("failed to parse health response: %w", err) + } + + if health.Status != "healthy" { + return fmt.Errorf("sidecar reports unhealthy status: %s", health.Status) + } + + if !health.RDMA.Enabled { + return fmt.Errorf("RDMA is not enabled on sidecar") + } + + if !health.RDMA.Connected { + glog.Warningf("RDMA sidecar is healthy but not connected to RDMA engine") + } + + return nil +} + +// ReadNeedle reads data from a specific needle using RDMA acceleration +func (c *RDMAMountClient) ReadNeedle(ctx context.Context, fileID string, offset, size uint64) ([]byte, bool, error) { + // Acquire semaphore for concurrency control + select { + case c.semaphore <- struct{}{}: + defer func() { <-c.semaphore }() + case <-ctx.Done(): + return nil, false, ctx.Err() + } + + atomic.AddInt64(&c.totalRequests, 1) + startTime := time.Now() + + // Lookup volume location using file ID directly + volumeServer, err := c.lookupVolumeLocationByFileID(ctx, fileID) + if err != nil { + atomic.AddInt64(&c.failedReads, 1) + return nil, false, fmt.Errorf("failed to lookup volume for file %s: %w", fileID, err) + } + + // Prepare request URL with file_id parameter (simpler than individual components) + reqURL := fmt.Sprintf("http://%s/read?file_id=%s&offset=%d&size=%d&volume_server=%s", + c.sidecarAddr, fileID, offset, size, volumeServer) + + req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) + if err != nil { + atomic.AddInt64(&c.failedReads, 1) + return nil, false, fmt.Errorf("failed to create RDMA request: %w", err) + } + + // Execute request + resp, err := c.httpClient.Do(req) + if err != nil { + atomic.AddInt64(&c.failedReads, 1) + return nil, false, fmt.Errorf("RDMA request failed: %w", err) + } + defer resp.Body.Close() + + duration := time.Since(startTime) + atomic.AddInt64(&c.totalLatencyNs, duration.Nanoseconds()) + + if resp.StatusCode != http.StatusOK { + atomic.AddInt64(&c.failedReads, 1) + body, _ := io.ReadAll(resp.Body) + return nil, false, fmt.Errorf("RDMA read failed with status %s: %s", resp.Status, string(body)) + } + + // Check if response indicates RDMA was used + contentType := resp.Header.Get("Content-Type") + isRDMA := strings.Contains(resp.Header.Get("X-Source"), "rdma") || + resp.Header.Get("X-RDMA-Used") == "true" + + // Check for zero-copy temp file optimization + tempFilePath := resp.Header.Get("X-Temp-File") + useTempFile := resp.Header.Get("X-Use-Temp-File") == "true" + + var data []byte + + if useTempFile && tempFilePath != "" { + // Zero-copy path: read from temp file (page cache) + glog.V(4).Infof("๐ฅ Using zero-copy temp file: %s", tempFilePath) + + // Allocate buffer for temp file read + var bufferSize uint64 = 1024 * 1024 // Default 1MB + if size > 0 { + bufferSize = size + } + buffer := make([]byte, bufferSize) + + n, err := c.readFromTempFile(tempFilePath, buffer) + if err != nil { + glog.V(2).Infof("Zero-copy failed, falling back to HTTP body: %v", err) + // Fall back to reading HTTP body + data, err = io.ReadAll(resp.Body) + } else { + data = buffer[:n] + glog.V(4).Infof("๐ฅ Zero-copy successful: %d bytes from page cache", n) + } + + // Important: Cleanup temp file after reading (consumer responsibility) + // This prevents accumulation of temp files in /tmp/rdma-cache + go c.cleanupTempFile(tempFilePath) + } else { + // Regular path: read from HTTP response body + data, err = io.ReadAll(resp.Body) + } + + if err != nil { + atomic.AddInt64(&c.failedReads, 1) + return nil, false, fmt.Errorf("failed to read RDMA response: %w", err) + } + + atomic.AddInt64(&c.successfulReads, 1) + atomic.AddInt64(&c.totalBytesRead, int64(len(data))) + + // Log successful operation + glog.V(4).Infof("RDMA read completed: fileID=%s, size=%d, duration=%v, rdma=%v, contentType=%s", + fileID, size, duration, isRDMA, contentType) + + return data, isRDMA, nil +} + +// cleanupTempFile requests cleanup of a temp file from the sidecar +func (c *RDMAMountClient) cleanupTempFile(tempFilePath string) { + if tempFilePath == "" { + return + } + + // Give the page cache a brief moment to be utilized before cleanup + // This preserves the zero-copy performance window + time.Sleep(100 * time.Millisecond) + + // Call sidecar cleanup endpoint + cleanupURL := fmt.Sprintf("http://%s/cleanup?temp_file=%s", c.sidecarAddr, url.QueryEscape(tempFilePath)) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, "DELETE", cleanupURL, nil) + if err != nil { + glog.V(2).Infof("Failed to create cleanup request for %s: %v", tempFilePath, err) + return + } + + resp, err := c.httpClient.Do(req) + if err != nil { + glog.V(2).Infof("Failed to cleanup temp file %s: %v", tempFilePath, err) + return + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + glog.V(4).Infof("๐งน Temp file cleaned up: %s", tempFilePath) + } else { + glog.V(2).Infof("Cleanup failed for %s: status %s", tempFilePath, resp.Status) + } +} + +// GetStats returns current RDMA client statistics +func (c *RDMAMountClient) GetStats() map[string]interface{} { + totalRequests := atomic.LoadInt64(&c.totalRequests) + successfulReads := atomic.LoadInt64(&c.successfulReads) + failedReads := atomic.LoadInt64(&c.failedReads) + totalBytesRead := atomic.LoadInt64(&c.totalBytesRead) + totalLatencyNs := atomic.LoadInt64(&c.totalLatencyNs) + + successRate := float64(0) + avgLatencyNs := int64(0) + + if totalRequests > 0 { + successRate = float64(successfulReads) / float64(totalRequests) * 100 + avgLatencyNs = totalLatencyNs / totalRequests + } + + return map[string]interface{}{ + "sidecar_addr": c.sidecarAddr, + "max_concurrent": c.maxConcurrent, + "timeout_ms": int(c.timeout / time.Millisecond), + "total_requests": totalRequests, + "successful_reads": successfulReads, + "failed_reads": failedReads, + "success_rate_pct": fmt.Sprintf("%.1f", successRate), + "total_bytes_read": totalBytesRead, + "avg_latency_ns": avgLatencyNs, + "avg_latency_ms": fmt.Sprintf("%.3f", float64(avgLatencyNs)/1000000), + } +} + +// Close shuts down the RDMA client and releases resources +func (c *RDMAMountClient) Close() error { + // No need to close semaphore channel; closing it may cause panics if goroutines are still using it. + // The semaphore will be garbage collected when the client is no longer referenced. + + // Log final statistics + stats := c.GetStats() + glog.Infof("RDMA mount client closing: %+v", stats) + + return nil +} + +// IsHealthy checks if the RDMA sidecar is currently healthy +func (c *RDMAMountClient) IsHealthy() bool { + err := c.healthCheck() + return err == nil +} + +// readFromTempFile performs zero-copy read from temp file using page cache +func (c *RDMAMountClient) readFromTempFile(tempFilePath string, buffer []byte) (int, error) { + if tempFilePath == "" { + return 0, fmt.Errorf("empty temp file path") + } + + // Open temp file for reading + file, err := os.Open(tempFilePath) + if err != nil { + return 0, fmt.Errorf("failed to open temp file %s: %w", tempFilePath, err) + } + defer file.Close() + + // Read from temp file (this should be served from page cache) + n, err := file.Read(buffer) + if err != nil && err != io.EOF { + return n, fmt.Errorf("failed to read from temp file: %w", err) + } + + glog.V(4).Infof("๐ฅ Zero-copy read: %d bytes from temp file %s", n, tempFilePath) + + return n, nil +} |
