aboutsummaryrefslogtreecommitdiff
path: root/weed/mount
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mount')
-rw-r--r--weed/mount/filehandle.go63
-rw-r--r--weed/mount/filehandle_read.go67
-rw-r--r--weed/mount/rdma_client.go379
-rw-r--r--weed/mount/weedfs.go30
4 files changed, 538 insertions, 1 deletions
diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go
index 6cbc9745e..d3836754f 100644
--- a/weed/mount/filehandle.go
+++ b/weed/mount/filehandle.go
@@ -31,6 +31,11 @@ type FileHandle struct {
isDeleted bool
+ // RDMA chunk offset cache for performance optimization
+ chunkOffsetCache []int64
+ chunkCacheValid bool
+ chunkCacheLock sync.RWMutex
+
// for debugging
mirrorFile *os.File
}
@@ -84,14 +89,25 @@ func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
glog.Fatalf("setting file handle entry to nil")
}
fh.entry.SetEntry(entry)
+
+ // Invalidate chunk offset cache since chunks may have changed
+ fh.invalidateChunkCache()
}
func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry {
- return fh.entry.UpdateEntry(fn)
+ result := fh.entry.UpdateEntry(fn)
+
+ // Invalidate chunk offset cache since entry may have been modified
+ fh.invalidateChunkCache()
+
+ return result
}
func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
fh.entry.AppendChunks(chunks)
+
+ // Invalidate chunk offset cache since new chunks were added
+ fh.invalidateChunkCache()
}
func (fh *FileHandle) ReleaseHandle() {
@@ -111,3 +127,48 @@ func lessThan(a, b *filer_pb.FileChunk) bool {
}
return a.ModifiedTsNs < b.ModifiedTsNs
}
+
+// getCumulativeOffsets returns cached cumulative offsets for chunks, computing them if necessary
+func (fh *FileHandle) getCumulativeOffsets(chunks []*filer_pb.FileChunk) []int64 {
+ fh.chunkCacheLock.RLock()
+ if fh.chunkCacheValid && len(fh.chunkOffsetCache) == len(chunks)+1 {
+ // Cache is valid and matches current chunk count
+ result := make([]int64, len(fh.chunkOffsetCache))
+ copy(result, fh.chunkOffsetCache)
+ fh.chunkCacheLock.RUnlock()
+ return result
+ }
+ fh.chunkCacheLock.RUnlock()
+
+ // Need to compute/recompute cache
+ fh.chunkCacheLock.Lock()
+ defer fh.chunkCacheLock.Unlock()
+
+ // Double-check in case another goroutine computed it while we waited for the lock
+ if fh.chunkCacheValid && len(fh.chunkOffsetCache) == len(chunks)+1 {
+ result := make([]int64, len(fh.chunkOffsetCache))
+ copy(result, fh.chunkOffsetCache)
+ return result
+ }
+
+ // Compute cumulative offsets
+ cumulativeOffsets := make([]int64, len(chunks)+1)
+ for i, chunk := range chunks {
+ cumulativeOffsets[i+1] = cumulativeOffsets[i] + int64(chunk.Size)
+ }
+
+ // Cache the result
+ fh.chunkOffsetCache = make([]int64, len(cumulativeOffsets))
+ copy(fh.chunkOffsetCache, cumulativeOffsets)
+ fh.chunkCacheValid = true
+
+ return cumulativeOffsets
+}
+
+// invalidateChunkCache invalidates the chunk offset cache when chunks are modified
+func (fh *FileHandle) invalidateChunkCache() {
+ fh.chunkCacheLock.Lock()
+ fh.chunkCacheValid = false
+ fh.chunkOffsetCache = nil
+ fh.chunkCacheLock.Unlock()
+}
diff --git a/weed/mount/filehandle_read.go b/weed/mount/filehandle_read.go
index 87cf76655..88b020bf1 100644
--- a/weed/mount/filehandle_read.go
+++ b/weed/mount/filehandle_read.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
+ "sort"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -64,6 +65,17 @@ func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte
return int64(totalRead), 0, nil
}
+ // Try RDMA acceleration first if available
+ if fh.wfs.rdmaClient != nil && fh.wfs.option.RdmaEnabled {
+ totalRead, ts, err := fh.tryRDMARead(ctx, fileSize, buff, offset, entry)
+ if err == nil {
+ glog.V(4).Infof("RDMA read successful for %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead)
+ return int64(totalRead), ts, nil
+ }
+ glog.V(4).Infof("RDMA read failed for %s, falling back to HTTP: %v", fileFullPath, err)
+ }
+
+ // Fall back to normal chunk reading
totalRead, ts, err := fh.entryChunkGroup.ReadDataAt(ctx, fileSize, buff, offset)
if err != nil && err != io.EOF {
@@ -75,6 +87,61 @@ func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte
return int64(totalRead), ts, err
}
+// tryRDMARead attempts to read file data using RDMA acceleration
+func (fh *FileHandle) tryRDMARead(ctx context.Context, fileSize int64, buff []byte, offset int64, entry *LockedEntry) (int64, int64, error) {
+ // For now, we'll try to read the chunks directly using RDMA
+ // This is a simplified approach - in a full implementation, we'd need to
+ // handle chunk boundaries, multiple chunks, etc.
+
+ chunks := entry.GetEntry().Chunks
+ if len(chunks) == 0 {
+ return 0, 0, fmt.Errorf("no chunks available for RDMA read")
+ }
+
+ // Find the chunk that contains our offset using binary search
+ var targetChunk *filer_pb.FileChunk
+ var chunkOffset int64
+
+ // Get cached cumulative offsets for efficient binary search
+ cumulativeOffsets := fh.getCumulativeOffsets(chunks)
+
+ // Use binary search to find the chunk containing the offset
+ chunkIndex := sort.Search(len(chunks), func(i int) bool {
+ return offset < cumulativeOffsets[i+1]
+ })
+
+ // Verify the chunk actually contains our offset
+ if chunkIndex < len(chunks) && offset >= cumulativeOffsets[chunkIndex] {
+ targetChunk = chunks[chunkIndex]
+ chunkOffset = offset - cumulativeOffsets[chunkIndex]
+ }
+
+ if targetChunk == nil {
+ return 0, 0, fmt.Errorf("no chunk found for offset %d", offset)
+ }
+
+ // Calculate how much to read from this chunk
+ remainingInChunk := int64(targetChunk.Size) - chunkOffset
+ readSize := min(int64(len(buff)), remainingInChunk)
+
+ glog.V(4).Infof("RDMA read attempt: chunk=%s (fileId=%s), chunkOffset=%d, readSize=%d",
+ targetChunk.FileId, targetChunk.FileId, chunkOffset, readSize)
+
+ // Try RDMA read using file ID directly (more efficient)
+ data, isRDMA, err := fh.wfs.rdmaClient.ReadNeedle(ctx, targetChunk.FileId, uint64(chunkOffset), uint64(readSize))
+ if err != nil {
+ return 0, 0, fmt.Errorf("RDMA read failed: %w", err)
+ }
+
+ if !isRDMA {
+ return 0, 0, fmt.Errorf("RDMA not available for chunk")
+ }
+
+ // Copy data to buffer
+ copied := copy(buff, data)
+ return int64(copied), targetChunk.ModifiedTsNs, nil
+}
+
func (fh *FileHandle) downloadRemoteEntry(entry *LockedEntry) error {
fileFullPath := fh.FullPath()
diff --git a/weed/mount/rdma_client.go b/weed/mount/rdma_client.go
new file mode 100644
index 000000000..19fa5b5bc
--- /dev/null
+++ b/weed/mount/rdma_client.go
@@ -0,0 +1,379 @@
+package mount
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "net/url"
+ "os"
+ "strings"
+ "sync/atomic"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/wdclient"
+)
+
+// RDMAMountClient provides RDMA acceleration for SeaweedFS mount operations
+type RDMAMountClient struct {
+ sidecarAddr string
+ httpClient *http.Client
+ maxConcurrent int
+ timeout time.Duration
+ semaphore chan struct{}
+
+ // Volume lookup
+ lookupFileIdFn wdclient.LookupFileIdFunctionType
+
+ // Statistics
+ totalRequests int64
+ successfulReads int64
+ failedReads int64
+ totalBytesRead int64
+ totalLatencyNs int64
+}
+
+// RDMAReadRequest represents a request to read data via RDMA
+type RDMAReadRequest struct {
+ VolumeID uint32 `json:"volume_id"`
+ NeedleID uint64 `json:"needle_id"`
+ Cookie uint32 `json:"cookie"`
+ Offset uint64 `json:"offset"`
+ Size uint64 `json:"size"`
+}
+
+// RDMAReadResponse represents the response from an RDMA read operation
+type RDMAReadResponse struct {
+ Success bool `json:"success"`
+ IsRDMA bool `json:"is_rdma"`
+ Source string `json:"source"`
+ Duration string `json:"duration"`
+ DataSize int `json:"data_size"`
+ SessionID string `json:"session_id,omitempty"`
+ ErrorMsg string `json:"error,omitempty"`
+
+ // Zero-copy optimization fields
+ UseTempFile bool `json:"use_temp_file"`
+ TempFile string `json:"temp_file"`
+}
+
+// RDMAHealthResponse represents the health status of the RDMA sidecar
+type RDMAHealthResponse struct {
+ Status string `json:"status"`
+ RDMA struct {
+ Enabled bool `json:"enabled"`
+ Connected bool `json:"connected"`
+ } `json:"rdma"`
+ Timestamp string `json:"timestamp"`
+}
+
+// NewRDMAMountClient creates a new RDMA client for mount operations
+func NewRDMAMountClient(sidecarAddr string, lookupFileIdFn wdclient.LookupFileIdFunctionType, maxConcurrent int, timeoutMs int) (*RDMAMountClient, error) {
+ client := &RDMAMountClient{
+ sidecarAddr: sidecarAddr,
+ maxConcurrent: maxConcurrent,
+ timeout: time.Duration(timeoutMs) * time.Millisecond,
+ httpClient: &http.Client{
+ Timeout: time.Duration(timeoutMs) * time.Millisecond,
+ },
+ semaphore: make(chan struct{}, maxConcurrent),
+ lookupFileIdFn: lookupFileIdFn,
+ }
+
+ // Test connectivity and RDMA availability
+ if err := client.healthCheck(); err != nil {
+ return nil, fmt.Errorf("RDMA sidecar health check failed: %w", err)
+ }
+
+ glog.Infof("RDMA mount client initialized: sidecar=%s, maxConcurrent=%d, timeout=%v",
+ sidecarAddr, maxConcurrent, client.timeout)
+
+ return client, nil
+}
+
+// lookupVolumeLocationByFileID finds the best volume server for a given file ID
+func (c *RDMAMountClient) lookupVolumeLocationByFileID(ctx context.Context, fileID string) (string, error) {
+ glog.V(4).Infof("Looking up volume location for file ID %s", fileID)
+
+ targetUrls, err := c.lookupFileIdFn(ctx, fileID)
+ if err != nil {
+ return "", fmt.Errorf("failed to lookup volume for file %s: %w", fileID, err)
+ }
+
+ if len(targetUrls) == 0 {
+ return "", fmt.Errorf("no locations found for file %s", fileID)
+ }
+
+ // Choose the first URL and extract the server address
+ targetUrl := targetUrls[0]
+ // Extract server address from URL like "http://server:port/fileId"
+ parts := strings.Split(targetUrl, "/")
+ if len(parts) < 3 {
+ return "", fmt.Errorf("invalid target URL format: %s", targetUrl)
+ }
+ bestAddress := fmt.Sprintf("http://%s", parts[2])
+
+ glog.V(4).Infof("File %s located at %s", fileID, bestAddress)
+ return bestAddress, nil
+}
+
+// lookupVolumeLocation finds the best volume server for a given volume ID (legacy method)
+func (c *RDMAMountClient) lookupVolumeLocation(ctx context.Context, volumeID uint32, needleID uint64, cookie uint32) (string, error) {
+ // Create a file ID for lookup (format: volumeId,needleId,cookie)
+ fileID := fmt.Sprintf("%d,%x,%d", volumeID, needleID, cookie)
+ return c.lookupVolumeLocationByFileID(ctx, fileID)
+}
+
+// healthCheck verifies that the RDMA sidecar is available and functioning
+func (c *RDMAMountClient) healthCheck() error {
+ ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
+ defer cancel()
+
+ req, err := http.NewRequestWithContext(ctx, "GET",
+ fmt.Sprintf("http://%s/health", c.sidecarAddr), nil)
+ if err != nil {
+ return fmt.Errorf("failed to create health check request: %w", err)
+ }
+
+ resp, err := c.httpClient.Do(req)
+ if err != nil {
+ return fmt.Errorf("health check request failed: %w", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("health check failed with status: %s", resp.Status)
+ }
+
+ // Parse health response
+ var health RDMAHealthResponse
+ if err := json.NewDecoder(resp.Body).Decode(&health); err != nil {
+ return fmt.Errorf("failed to parse health response: %w", err)
+ }
+
+ if health.Status != "healthy" {
+ return fmt.Errorf("sidecar reports unhealthy status: %s", health.Status)
+ }
+
+ if !health.RDMA.Enabled {
+ return fmt.Errorf("RDMA is not enabled on sidecar")
+ }
+
+ if !health.RDMA.Connected {
+ glog.Warningf("RDMA sidecar is healthy but not connected to RDMA engine")
+ }
+
+ return nil
+}
+
+// ReadNeedle reads data from a specific needle using RDMA acceleration
+func (c *RDMAMountClient) ReadNeedle(ctx context.Context, fileID string, offset, size uint64) ([]byte, bool, error) {
+ // Acquire semaphore for concurrency control
+ select {
+ case c.semaphore <- struct{}{}:
+ defer func() { <-c.semaphore }()
+ case <-ctx.Done():
+ return nil, false, ctx.Err()
+ }
+
+ atomic.AddInt64(&c.totalRequests, 1)
+ startTime := time.Now()
+
+ // Lookup volume location using file ID directly
+ volumeServer, err := c.lookupVolumeLocationByFileID(ctx, fileID)
+ if err != nil {
+ atomic.AddInt64(&c.failedReads, 1)
+ return nil, false, fmt.Errorf("failed to lookup volume for file %s: %w", fileID, err)
+ }
+
+ // Prepare request URL with file_id parameter (simpler than individual components)
+ reqURL := fmt.Sprintf("http://%s/read?file_id=%s&offset=%d&size=%d&volume_server=%s",
+ c.sidecarAddr, fileID, offset, size, volumeServer)
+
+ req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil)
+ if err != nil {
+ atomic.AddInt64(&c.failedReads, 1)
+ return nil, false, fmt.Errorf("failed to create RDMA request: %w", err)
+ }
+
+ // Execute request
+ resp, err := c.httpClient.Do(req)
+ if err != nil {
+ atomic.AddInt64(&c.failedReads, 1)
+ return nil, false, fmt.Errorf("RDMA request failed: %w", err)
+ }
+ defer resp.Body.Close()
+
+ duration := time.Since(startTime)
+ atomic.AddInt64(&c.totalLatencyNs, duration.Nanoseconds())
+
+ if resp.StatusCode != http.StatusOK {
+ atomic.AddInt64(&c.failedReads, 1)
+ body, _ := io.ReadAll(resp.Body)
+ return nil, false, fmt.Errorf("RDMA read failed with status %s: %s", resp.Status, string(body))
+ }
+
+ // Check if response indicates RDMA was used
+ contentType := resp.Header.Get("Content-Type")
+ isRDMA := strings.Contains(resp.Header.Get("X-Source"), "rdma") ||
+ resp.Header.Get("X-RDMA-Used") == "true"
+
+ // Check for zero-copy temp file optimization
+ tempFilePath := resp.Header.Get("X-Temp-File")
+ useTempFile := resp.Header.Get("X-Use-Temp-File") == "true"
+
+ var data []byte
+
+ if useTempFile && tempFilePath != "" {
+ // Zero-copy path: read from temp file (page cache)
+ glog.V(4).Infof("๐Ÿ”ฅ Using zero-copy temp file: %s", tempFilePath)
+
+ // Allocate buffer for temp file read
+ var bufferSize uint64 = 1024 * 1024 // Default 1MB
+ if size > 0 {
+ bufferSize = size
+ }
+ buffer := make([]byte, bufferSize)
+
+ n, err := c.readFromTempFile(tempFilePath, buffer)
+ if err != nil {
+ glog.V(2).Infof("Zero-copy failed, falling back to HTTP body: %v", err)
+ // Fall back to reading HTTP body
+ data, err = io.ReadAll(resp.Body)
+ } else {
+ data = buffer[:n]
+ glog.V(4).Infof("๐Ÿ”ฅ Zero-copy successful: %d bytes from page cache", n)
+ }
+
+ // Important: Cleanup temp file after reading (consumer responsibility)
+ // This prevents accumulation of temp files in /tmp/rdma-cache
+ go c.cleanupTempFile(tempFilePath)
+ } else {
+ // Regular path: read from HTTP response body
+ data, err = io.ReadAll(resp.Body)
+ }
+
+ if err != nil {
+ atomic.AddInt64(&c.failedReads, 1)
+ return nil, false, fmt.Errorf("failed to read RDMA response: %w", err)
+ }
+
+ atomic.AddInt64(&c.successfulReads, 1)
+ atomic.AddInt64(&c.totalBytesRead, int64(len(data)))
+
+ // Log successful operation
+ glog.V(4).Infof("RDMA read completed: fileID=%s, size=%d, duration=%v, rdma=%v, contentType=%s",
+ fileID, size, duration, isRDMA, contentType)
+
+ return data, isRDMA, nil
+}
+
+// cleanupTempFile requests cleanup of a temp file from the sidecar
+func (c *RDMAMountClient) cleanupTempFile(tempFilePath string) {
+ if tempFilePath == "" {
+ return
+ }
+
+ // Give the page cache a brief moment to be utilized before cleanup
+ // This preserves the zero-copy performance window
+ time.Sleep(100 * time.Millisecond)
+
+ // Call sidecar cleanup endpoint
+ cleanupURL := fmt.Sprintf("http://%s/cleanup?temp_file=%s", c.sidecarAddr, url.QueryEscape(tempFilePath))
+
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ req, err := http.NewRequestWithContext(ctx, "DELETE", cleanupURL, nil)
+ if err != nil {
+ glog.V(2).Infof("Failed to create cleanup request for %s: %v", tempFilePath, err)
+ return
+ }
+
+ resp, err := c.httpClient.Do(req)
+ if err != nil {
+ glog.V(2).Infof("Failed to cleanup temp file %s: %v", tempFilePath, err)
+ return
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode == http.StatusOK {
+ glog.V(4).Infof("๐Ÿงน Temp file cleaned up: %s", tempFilePath)
+ } else {
+ glog.V(2).Infof("Cleanup failed for %s: status %s", tempFilePath, resp.Status)
+ }
+}
+
+// GetStats returns current RDMA client statistics
+func (c *RDMAMountClient) GetStats() map[string]interface{} {
+ totalRequests := atomic.LoadInt64(&c.totalRequests)
+ successfulReads := atomic.LoadInt64(&c.successfulReads)
+ failedReads := atomic.LoadInt64(&c.failedReads)
+ totalBytesRead := atomic.LoadInt64(&c.totalBytesRead)
+ totalLatencyNs := atomic.LoadInt64(&c.totalLatencyNs)
+
+ successRate := float64(0)
+ avgLatencyNs := int64(0)
+
+ if totalRequests > 0 {
+ successRate = float64(successfulReads) / float64(totalRequests) * 100
+ avgLatencyNs = totalLatencyNs / totalRequests
+ }
+
+ return map[string]interface{}{
+ "sidecar_addr": c.sidecarAddr,
+ "max_concurrent": c.maxConcurrent,
+ "timeout_ms": int(c.timeout / time.Millisecond),
+ "total_requests": totalRequests,
+ "successful_reads": successfulReads,
+ "failed_reads": failedReads,
+ "success_rate_pct": fmt.Sprintf("%.1f", successRate),
+ "total_bytes_read": totalBytesRead,
+ "avg_latency_ns": avgLatencyNs,
+ "avg_latency_ms": fmt.Sprintf("%.3f", float64(avgLatencyNs)/1000000),
+ }
+}
+
+// Close shuts down the RDMA client and releases resources
+func (c *RDMAMountClient) Close() error {
+ // No need to close semaphore channel; closing it may cause panics if goroutines are still using it.
+ // The semaphore will be garbage collected when the client is no longer referenced.
+
+ // Log final statistics
+ stats := c.GetStats()
+ glog.Infof("RDMA mount client closing: %+v", stats)
+
+ return nil
+}
+
+// IsHealthy checks if the RDMA sidecar is currently healthy
+func (c *RDMAMountClient) IsHealthy() bool {
+ err := c.healthCheck()
+ return err == nil
+}
+
+// readFromTempFile performs zero-copy read from temp file using page cache
+func (c *RDMAMountClient) readFromTempFile(tempFilePath string, buffer []byte) (int, error) {
+ if tempFilePath == "" {
+ return 0, fmt.Errorf("empty temp file path")
+ }
+
+ // Open temp file for reading
+ file, err := os.Open(tempFilePath)
+ if err != nil {
+ return 0, fmt.Errorf("failed to open temp file %s: %w", tempFilePath, err)
+ }
+ defer file.Close()
+
+ // Read from temp file (this should be served from page cache)
+ n, err := file.Read(buffer)
+ if err != nil && err != io.EOF {
+ return n, fmt.Errorf("failed to read from temp file: %w", err)
+ }
+
+ glog.V(4).Infof("๐Ÿ”ฅ Zero-copy read: %d bytes from temp file %s", n, tempFilePath)
+
+ return n, nil
+}
diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go
index 849b3ad0c..41896ff87 100644
--- a/weed/mount/weedfs.go
+++ b/weed/mount/weedfs.go
@@ -15,6 +15,7 @@ import (
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -62,6 +63,14 @@ type Option struct {
Cipher bool // whether encrypt data on volume server
UidGidMapper *meta_cache.UidGidMapper
+ // RDMA acceleration options
+ RdmaEnabled bool
+ RdmaSidecarAddr string
+ RdmaFallback bool
+ RdmaReadOnly bool
+ RdmaMaxConcurrent int
+ RdmaTimeoutMs int
+
uniqueCacheDirForRead string
uniqueCacheDirForWrite string
}
@@ -86,6 +95,7 @@ type WFS struct {
fuseServer *fuse.Server
IsOverQuota bool
fhLockTable *util.LockTable[FileHandleId]
+ rdmaClient *RDMAMountClient
FilerConf *filer.FilerConf
}
@@ -138,8 +148,28 @@ func NewSeaweedFileSystem(option *Option) *WFS {
wfs.metaCache.Shutdown()
os.RemoveAll(option.getUniqueCacheDirForWrite())
os.RemoveAll(option.getUniqueCacheDirForRead())
+ if wfs.rdmaClient != nil {
+ wfs.rdmaClient.Close()
+ }
})
+ // Initialize RDMA client if enabled
+ if option.RdmaEnabled && option.RdmaSidecarAddr != "" {
+ rdmaClient, err := NewRDMAMountClient(
+ option.RdmaSidecarAddr,
+ wfs.LookupFn(),
+ option.RdmaMaxConcurrent,
+ option.RdmaTimeoutMs,
+ )
+ if err != nil {
+ glog.Warningf("Failed to initialize RDMA client: %v", err)
+ } else {
+ wfs.rdmaClient = rdmaClient
+ glog.Infof("RDMA acceleration enabled: sidecar=%s, maxConcurrent=%d, timeout=%dms",
+ option.RdmaSidecarAddr, option.RdmaMaxConcurrent, option.RdmaTimeoutMs)
+ }
+ }
+
if wfs.option.ConcurrentWriters > 0 {
wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
wfs.concurrentCopiersSem = make(chan struct{}, wfs.option.ConcurrentWriters)