diff options
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/command/mount.go | 29 | ||||
| -rw-r--r-- | weed/command/mount_std.go | 7 | ||||
| -rw-r--r-- | weed/mount/filehandle.go | 63 | ||||
| -rw-r--r-- | weed/mount/filehandle_read.go | 67 | ||||
| -rw-r--r-- | weed/mount/rdma_client.go | 379 | ||||
| -rw-r--r-- | weed/mount/weedfs.go | 30 |
6 files changed, 574 insertions, 1 deletions
diff --git a/weed/command/mount.go b/weed/command/mount.go index 21e49f236..98f139c6f 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -35,6 +35,14 @@ type MountOptions struct { disableXAttr *bool extraOptions []string fuseCommandPid int + + // RDMA acceleration options + rdmaEnabled *bool + rdmaSidecarAddr *string + rdmaFallback *bool + rdmaReadOnly *bool + rdmaMaxConcurrent *int + rdmaTimeoutMs *int } var ( @@ -75,6 +83,14 @@ func init() { mountOptions.disableXAttr = cmdMount.Flag.Bool("disableXAttr", false, "disable xattr") mountOptions.fuseCommandPid = 0 + // RDMA acceleration flags + mountOptions.rdmaEnabled = cmdMount.Flag.Bool("rdma.enabled", false, "enable RDMA acceleration for reads") + mountOptions.rdmaSidecarAddr = cmdMount.Flag.String("rdma.sidecar", "", "RDMA sidecar address (e.g., localhost:8081)") + mountOptions.rdmaFallback = cmdMount.Flag.Bool("rdma.fallback", true, "fallback to HTTP when RDMA fails") + mountOptions.rdmaReadOnly = cmdMount.Flag.Bool("rdma.readOnly", false, "use RDMA for reads only (writes use HTTP)") + mountOptions.rdmaMaxConcurrent = cmdMount.Flag.Int("rdma.maxConcurrent", 64, "max concurrent RDMA operations") + mountOptions.rdmaTimeoutMs = cmdMount.Flag.Int("rdma.timeoutMs", 5000, "RDMA operation timeout in milliseconds") + mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file") mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file") mountReadRetryTime = cmdMount.Flag.Duration("readRetryTime", 6*time.Second, "maximum read retry wait time") @@ -95,5 +111,18 @@ var cmdMount = &Command{ On OS X, it requires OSXFUSE (https://osxfuse.github.io/). + RDMA Acceleration: + For ultra-fast reads, enable RDMA acceleration with an RDMA sidecar: + weed mount -filer=localhost:8888 -dir=/mnt/seaweedfs \ + -rdma.enabled=true -rdma.sidecar=localhost:8081 + + RDMA Options: + -rdma.enabled=false Enable RDMA acceleration for reads + -rdma.sidecar="" RDMA sidecar address (required if enabled) + -rdma.fallback=true Fallback to HTTP when RDMA fails + -rdma.readOnly=false Use RDMA for reads only (writes use HTTP) + -rdma.maxConcurrent=64 Max concurrent RDMA operations + -rdma.timeoutMs=5000 RDMA operation timeout in milliseconds + `, } diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 588d38ce4..53b09589d 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -253,6 +253,13 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { UidGidMapper: uidGidMapper, DisableXAttr: *option.disableXAttr, IsMacOs: runtime.GOOS == "darwin", + // RDMA acceleration options + RdmaEnabled: *option.rdmaEnabled, + RdmaSidecarAddr: *option.rdmaSidecarAddr, + RdmaFallback: *option.rdmaFallback, + RdmaReadOnly: *option.rdmaReadOnly, + RdmaMaxConcurrent: *option.rdmaMaxConcurrent, + RdmaTimeoutMs: *option.rdmaTimeoutMs, }) // create mount root diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go index 6cbc9745e..d3836754f 100644 --- a/weed/mount/filehandle.go +++ b/weed/mount/filehandle.go @@ -31,6 +31,11 @@ type FileHandle struct { isDeleted bool + // RDMA chunk offset cache for performance optimization + chunkOffsetCache []int64 + chunkCacheValid bool + chunkCacheLock sync.RWMutex + // for debugging mirrorFile *os.File } @@ -84,14 +89,25 @@ func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) { glog.Fatalf("setting file handle entry to nil") } fh.entry.SetEntry(entry) + + // Invalidate chunk offset cache since chunks may have changed + fh.invalidateChunkCache() } func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry { - return fh.entry.UpdateEntry(fn) + result := fh.entry.UpdateEntry(fn) + + // Invalidate chunk offset cache since entry may have been modified + fh.invalidateChunkCache() + + return result } func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) { fh.entry.AppendChunks(chunks) + + // Invalidate chunk offset cache since new chunks were added + fh.invalidateChunkCache() } func (fh *FileHandle) ReleaseHandle() { @@ -111,3 +127,48 @@ func lessThan(a, b *filer_pb.FileChunk) bool { } return a.ModifiedTsNs < b.ModifiedTsNs } + +// getCumulativeOffsets returns cached cumulative offsets for chunks, computing them if necessary +func (fh *FileHandle) getCumulativeOffsets(chunks []*filer_pb.FileChunk) []int64 { + fh.chunkCacheLock.RLock() + if fh.chunkCacheValid && len(fh.chunkOffsetCache) == len(chunks)+1 { + // Cache is valid and matches current chunk count + result := make([]int64, len(fh.chunkOffsetCache)) + copy(result, fh.chunkOffsetCache) + fh.chunkCacheLock.RUnlock() + return result + } + fh.chunkCacheLock.RUnlock() + + // Need to compute/recompute cache + fh.chunkCacheLock.Lock() + defer fh.chunkCacheLock.Unlock() + + // Double-check in case another goroutine computed it while we waited for the lock + if fh.chunkCacheValid && len(fh.chunkOffsetCache) == len(chunks)+1 { + result := make([]int64, len(fh.chunkOffsetCache)) + copy(result, fh.chunkOffsetCache) + return result + } + + // Compute cumulative offsets + cumulativeOffsets := make([]int64, len(chunks)+1) + for i, chunk := range chunks { + cumulativeOffsets[i+1] = cumulativeOffsets[i] + int64(chunk.Size) + } + + // Cache the result + fh.chunkOffsetCache = make([]int64, len(cumulativeOffsets)) + copy(fh.chunkOffsetCache, cumulativeOffsets) + fh.chunkCacheValid = true + + return cumulativeOffsets +} + +// invalidateChunkCache invalidates the chunk offset cache when chunks are modified +func (fh *FileHandle) invalidateChunkCache() { + fh.chunkCacheLock.Lock() + fh.chunkCacheValid = false + fh.chunkOffsetCache = nil + fh.chunkCacheLock.Unlock() +} diff --git a/weed/mount/filehandle_read.go b/weed/mount/filehandle_read.go index 87cf76655..88b020bf1 100644 --- a/weed/mount/filehandle_read.go +++ b/weed/mount/filehandle_read.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "sort" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -64,6 +65,17 @@ func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte return int64(totalRead), 0, nil } + // Try RDMA acceleration first if available + if fh.wfs.rdmaClient != nil && fh.wfs.option.RdmaEnabled { + totalRead, ts, err := fh.tryRDMARead(ctx, fileSize, buff, offset, entry) + if err == nil { + glog.V(4).Infof("RDMA read successful for %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead) + return int64(totalRead), ts, nil + } + glog.V(4).Infof("RDMA read failed for %s, falling back to HTTP: %v", fileFullPath, err) + } + + // Fall back to normal chunk reading totalRead, ts, err := fh.entryChunkGroup.ReadDataAt(ctx, fileSize, buff, offset) if err != nil && err != io.EOF { @@ -75,6 +87,61 @@ func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte return int64(totalRead), ts, err } +// tryRDMARead attempts to read file data using RDMA acceleration +func (fh *FileHandle) tryRDMARead(ctx context.Context, fileSize int64, buff []byte, offset int64, entry *LockedEntry) (int64, int64, error) { + // For now, we'll try to read the chunks directly using RDMA + // This is a simplified approach - in a full implementation, we'd need to + // handle chunk boundaries, multiple chunks, etc. + + chunks := entry.GetEntry().Chunks + if len(chunks) == 0 { + return 0, 0, fmt.Errorf("no chunks available for RDMA read") + } + + // Find the chunk that contains our offset using binary search + var targetChunk *filer_pb.FileChunk + var chunkOffset int64 + + // Get cached cumulative offsets for efficient binary search + cumulativeOffsets := fh.getCumulativeOffsets(chunks) + + // Use binary search to find the chunk containing the offset + chunkIndex := sort.Search(len(chunks), func(i int) bool { + return offset < cumulativeOffsets[i+1] + }) + + // Verify the chunk actually contains our offset + if chunkIndex < len(chunks) && offset >= cumulativeOffsets[chunkIndex] { + targetChunk = chunks[chunkIndex] + chunkOffset = offset - cumulativeOffsets[chunkIndex] + } + + if targetChunk == nil { + return 0, 0, fmt.Errorf("no chunk found for offset %d", offset) + } + + // Calculate how much to read from this chunk + remainingInChunk := int64(targetChunk.Size) - chunkOffset + readSize := min(int64(len(buff)), remainingInChunk) + + glog.V(4).Infof("RDMA read attempt: chunk=%s (fileId=%s), chunkOffset=%d, readSize=%d", + targetChunk.FileId, targetChunk.FileId, chunkOffset, readSize) + + // Try RDMA read using file ID directly (more efficient) + data, isRDMA, err := fh.wfs.rdmaClient.ReadNeedle(ctx, targetChunk.FileId, uint64(chunkOffset), uint64(readSize)) + if err != nil { + return 0, 0, fmt.Errorf("RDMA read failed: %w", err) + } + + if !isRDMA { + return 0, 0, fmt.Errorf("RDMA not available for chunk") + } + + // Copy data to buffer + copied := copy(buff, data) + return int64(copied), targetChunk.ModifiedTsNs, nil +} + func (fh *FileHandle) downloadRemoteEntry(entry *LockedEntry) error { fileFullPath := fh.FullPath() diff --git a/weed/mount/rdma_client.go b/weed/mount/rdma_client.go new file mode 100644 index 000000000..19fa5b5bc --- /dev/null +++ b/weed/mount/rdma_client.go @@ -0,0 +1,379 @@ +package mount + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "os" + "strings" + "sync/atomic" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/wdclient" +) + +// RDMAMountClient provides RDMA acceleration for SeaweedFS mount operations +type RDMAMountClient struct { + sidecarAddr string + httpClient *http.Client + maxConcurrent int + timeout time.Duration + semaphore chan struct{} + + // Volume lookup + lookupFileIdFn wdclient.LookupFileIdFunctionType + + // Statistics + totalRequests int64 + successfulReads int64 + failedReads int64 + totalBytesRead int64 + totalLatencyNs int64 +} + +// RDMAReadRequest represents a request to read data via RDMA +type RDMAReadRequest struct { + VolumeID uint32 `json:"volume_id"` + NeedleID uint64 `json:"needle_id"` + Cookie uint32 `json:"cookie"` + Offset uint64 `json:"offset"` + Size uint64 `json:"size"` +} + +// RDMAReadResponse represents the response from an RDMA read operation +type RDMAReadResponse struct { + Success bool `json:"success"` + IsRDMA bool `json:"is_rdma"` + Source string `json:"source"` + Duration string `json:"duration"` + DataSize int `json:"data_size"` + SessionID string `json:"session_id,omitempty"` + ErrorMsg string `json:"error,omitempty"` + + // Zero-copy optimization fields + UseTempFile bool `json:"use_temp_file"` + TempFile string `json:"temp_file"` +} + +// RDMAHealthResponse represents the health status of the RDMA sidecar +type RDMAHealthResponse struct { + Status string `json:"status"` + RDMA struct { + Enabled bool `json:"enabled"` + Connected bool `json:"connected"` + } `json:"rdma"` + Timestamp string `json:"timestamp"` +} + +// NewRDMAMountClient creates a new RDMA client for mount operations +func NewRDMAMountClient(sidecarAddr string, lookupFileIdFn wdclient.LookupFileIdFunctionType, maxConcurrent int, timeoutMs int) (*RDMAMountClient, error) { + client := &RDMAMountClient{ + sidecarAddr: sidecarAddr, + maxConcurrent: maxConcurrent, + timeout: time.Duration(timeoutMs) * time.Millisecond, + httpClient: &http.Client{ + Timeout: time.Duration(timeoutMs) * time.Millisecond, + }, + semaphore: make(chan struct{}, maxConcurrent), + lookupFileIdFn: lookupFileIdFn, + } + + // Test connectivity and RDMA availability + if err := client.healthCheck(); err != nil { + return nil, fmt.Errorf("RDMA sidecar health check failed: %w", err) + } + + glog.Infof("RDMA mount client initialized: sidecar=%s, maxConcurrent=%d, timeout=%v", + sidecarAddr, maxConcurrent, client.timeout) + + return client, nil +} + +// lookupVolumeLocationByFileID finds the best volume server for a given file ID +func (c *RDMAMountClient) lookupVolumeLocationByFileID(ctx context.Context, fileID string) (string, error) { + glog.V(4).Infof("Looking up volume location for file ID %s", fileID) + + targetUrls, err := c.lookupFileIdFn(ctx, fileID) + if err != nil { + return "", fmt.Errorf("failed to lookup volume for file %s: %w", fileID, err) + } + + if len(targetUrls) == 0 { + return "", fmt.Errorf("no locations found for file %s", fileID) + } + + // Choose the first URL and extract the server address + targetUrl := targetUrls[0] + // Extract server address from URL like "http://server:port/fileId" + parts := strings.Split(targetUrl, "/") + if len(parts) < 3 { + return "", fmt.Errorf("invalid target URL format: %s", targetUrl) + } + bestAddress := fmt.Sprintf("http://%s", parts[2]) + + glog.V(4).Infof("File %s located at %s", fileID, bestAddress) + return bestAddress, nil +} + +// lookupVolumeLocation finds the best volume server for a given volume ID (legacy method) +func (c *RDMAMountClient) lookupVolumeLocation(ctx context.Context, volumeID uint32, needleID uint64, cookie uint32) (string, error) { + // Create a file ID for lookup (format: volumeId,needleId,cookie) + fileID := fmt.Sprintf("%d,%x,%d", volumeID, needleID, cookie) + return c.lookupVolumeLocationByFileID(ctx, fileID) +} + +// healthCheck verifies that the RDMA sidecar is available and functioning +func (c *RDMAMountClient) healthCheck() error { + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, "GET", + fmt.Sprintf("http://%s/health", c.sidecarAddr), nil) + if err != nil { + return fmt.Errorf("failed to create health check request: %w", err) + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("health check request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("health check failed with status: %s", resp.Status) + } + + // Parse health response + var health RDMAHealthResponse + if err := json.NewDecoder(resp.Body).Decode(&health); err != nil { + return fmt.Errorf("failed to parse health response: %w", err) + } + + if health.Status != "healthy" { + return fmt.Errorf("sidecar reports unhealthy status: %s", health.Status) + } + + if !health.RDMA.Enabled { + return fmt.Errorf("RDMA is not enabled on sidecar") + } + + if !health.RDMA.Connected { + glog.Warningf("RDMA sidecar is healthy but not connected to RDMA engine") + } + + return nil +} + +// ReadNeedle reads data from a specific needle using RDMA acceleration +func (c *RDMAMountClient) ReadNeedle(ctx context.Context, fileID string, offset, size uint64) ([]byte, bool, error) { + // Acquire semaphore for concurrency control + select { + case c.semaphore <- struct{}{}: + defer func() { <-c.semaphore }() + case <-ctx.Done(): + return nil, false, ctx.Err() + } + + atomic.AddInt64(&c.totalRequests, 1) + startTime := time.Now() + + // Lookup volume location using file ID directly + volumeServer, err := c.lookupVolumeLocationByFileID(ctx, fileID) + if err != nil { + atomic.AddInt64(&c.failedReads, 1) + return nil, false, fmt.Errorf("failed to lookup volume for file %s: %w", fileID, err) + } + + // Prepare request URL with file_id parameter (simpler than individual components) + reqURL := fmt.Sprintf("http://%s/read?file_id=%s&offset=%d&size=%d&volume_server=%s", + c.sidecarAddr, fileID, offset, size, volumeServer) + + req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) + if err != nil { + atomic.AddInt64(&c.failedReads, 1) + return nil, false, fmt.Errorf("failed to create RDMA request: %w", err) + } + + // Execute request + resp, err := c.httpClient.Do(req) + if err != nil { + atomic.AddInt64(&c.failedReads, 1) + return nil, false, fmt.Errorf("RDMA request failed: %w", err) + } + defer resp.Body.Close() + + duration := time.Since(startTime) + atomic.AddInt64(&c.totalLatencyNs, duration.Nanoseconds()) + + if resp.StatusCode != http.StatusOK { + atomic.AddInt64(&c.failedReads, 1) + body, _ := io.ReadAll(resp.Body) + return nil, false, fmt.Errorf("RDMA read failed with status %s: %s", resp.Status, string(body)) + } + + // Check if response indicates RDMA was used + contentType := resp.Header.Get("Content-Type") + isRDMA := strings.Contains(resp.Header.Get("X-Source"), "rdma") || + resp.Header.Get("X-RDMA-Used") == "true" + + // Check for zero-copy temp file optimization + tempFilePath := resp.Header.Get("X-Temp-File") + useTempFile := resp.Header.Get("X-Use-Temp-File") == "true" + + var data []byte + + if useTempFile && tempFilePath != "" { + // Zero-copy path: read from temp file (page cache) + glog.V(4).Infof("๐ฅ Using zero-copy temp file: %s", tempFilePath) + + // Allocate buffer for temp file read + var bufferSize uint64 = 1024 * 1024 // Default 1MB + if size > 0 { + bufferSize = size + } + buffer := make([]byte, bufferSize) + + n, err := c.readFromTempFile(tempFilePath, buffer) + if err != nil { + glog.V(2).Infof("Zero-copy failed, falling back to HTTP body: %v", err) + // Fall back to reading HTTP body + data, err = io.ReadAll(resp.Body) + } else { + data = buffer[:n] + glog.V(4).Infof("๐ฅ Zero-copy successful: %d bytes from page cache", n) + } + + // Important: Cleanup temp file after reading (consumer responsibility) + // This prevents accumulation of temp files in /tmp/rdma-cache + go c.cleanupTempFile(tempFilePath) + } else { + // Regular path: read from HTTP response body + data, err = io.ReadAll(resp.Body) + } + + if err != nil { + atomic.AddInt64(&c.failedReads, 1) + return nil, false, fmt.Errorf("failed to read RDMA response: %w", err) + } + + atomic.AddInt64(&c.successfulReads, 1) + atomic.AddInt64(&c.totalBytesRead, int64(len(data))) + + // Log successful operation + glog.V(4).Infof("RDMA read completed: fileID=%s, size=%d, duration=%v, rdma=%v, contentType=%s", + fileID, size, duration, isRDMA, contentType) + + return data, isRDMA, nil +} + +// cleanupTempFile requests cleanup of a temp file from the sidecar +func (c *RDMAMountClient) cleanupTempFile(tempFilePath string) { + if tempFilePath == "" { + return + } + + // Give the page cache a brief moment to be utilized before cleanup + // This preserves the zero-copy performance window + time.Sleep(100 * time.Millisecond) + + // Call sidecar cleanup endpoint + cleanupURL := fmt.Sprintf("http://%s/cleanup?temp_file=%s", c.sidecarAddr, url.QueryEscape(tempFilePath)) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, "DELETE", cleanupURL, nil) + if err != nil { + glog.V(2).Infof("Failed to create cleanup request for %s: %v", tempFilePath, err) + return + } + + resp, err := c.httpClient.Do(req) + if err != nil { + glog.V(2).Infof("Failed to cleanup temp file %s: %v", tempFilePath, err) + return + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + glog.V(4).Infof("๐งน Temp file cleaned up: %s", tempFilePath) + } else { + glog.V(2).Infof("Cleanup failed for %s: status %s", tempFilePath, resp.Status) + } +} + +// GetStats returns current RDMA client statistics +func (c *RDMAMountClient) GetStats() map[string]interface{} { + totalRequests := atomic.LoadInt64(&c.totalRequests) + successfulReads := atomic.LoadInt64(&c.successfulReads) + failedReads := atomic.LoadInt64(&c.failedReads) + totalBytesRead := atomic.LoadInt64(&c.totalBytesRead) + totalLatencyNs := atomic.LoadInt64(&c.totalLatencyNs) + + successRate := float64(0) + avgLatencyNs := int64(0) + + if totalRequests > 0 { + successRate = float64(successfulReads) / float64(totalRequests) * 100 + avgLatencyNs = totalLatencyNs / totalRequests + } + + return map[string]interface{}{ + "sidecar_addr": c.sidecarAddr, + "max_concurrent": c.maxConcurrent, + "timeout_ms": int(c.timeout / time.Millisecond), + "total_requests": totalRequests, + "successful_reads": successfulReads, + "failed_reads": failedReads, + "success_rate_pct": fmt.Sprintf("%.1f", successRate), + "total_bytes_read": totalBytesRead, + "avg_latency_ns": avgLatencyNs, + "avg_latency_ms": fmt.Sprintf("%.3f", float64(avgLatencyNs)/1000000), + } +} + +// Close shuts down the RDMA client and releases resources +func (c *RDMAMountClient) Close() error { + // No need to close semaphore channel; closing it may cause panics if goroutines are still using it. + // The semaphore will be garbage collected when the client is no longer referenced. + + // Log final statistics + stats := c.GetStats() + glog.Infof("RDMA mount client closing: %+v", stats) + + return nil +} + +// IsHealthy checks if the RDMA sidecar is currently healthy +func (c *RDMAMountClient) IsHealthy() bool { + err := c.healthCheck() + return err == nil +} + +// readFromTempFile performs zero-copy read from temp file using page cache +func (c *RDMAMountClient) readFromTempFile(tempFilePath string, buffer []byte) (int, error) { + if tempFilePath == "" { + return 0, fmt.Errorf("empty temp file path") + } + + // Open temp file for reading + file, err := os.Open(tempFilePath) + if err != nil { + return 0, fmt.Errorf("failed to open temp file %s: %w", tempFilePath, err) + } + defer file.Close() + + // Read from temp file (this should be served from page cache) + n, err := file.Read(buffer) + if err != nil && err != io.EOF { + return n, fmt.Errorf("failed to read from temp file: %w", err) + } + + glog.V(4).Infof("๐ฅ Zero-copy read: %d bytes from temp file %s", n, tempFilePath) + + return n, nil +} diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 849b3ad0c..41896ff87 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -15,6 +15,7 @@ import ( "google.golang.org/grpc" "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -62,6 +63,14 @@ type Option struct { Cipher bool // whether encrypt data on volume server UidGidMapper *meta_cache.UidGidMapper + // RDMA acceleration options + RdmaEnabled bool + RdmaSidecarAddr string + RdmaFallback bool + RdmaReadOnly bool + RdmaMaxConcurrent int + RdmaTimeoutMs int + uniqueCacheDirForRead string uniqueCacheDirForWrite string } @@ -86,6 +95,7 @@ type WFS struct { fuseServer *fuse.Server IsOverQuota bool fhLockTable *util.LockTable[FileHandleId] + rdmaClient *RDMAMountClient FilerConf *filer.FilerConf } @@ -138,8 +148,28 @@ func NewSeaweedFileSystem(option *Option) *WFS { wfs.metaCache.Shutdown() os.RemoveAll(option.getUniqueCacheDirForWrite()) os.RemoveAll(option.getUniqueCacheDirForRead()) + if wfs.rdmaClient != nil { + wfs.rdmaClient.Close() + } }) + // Initialize RDMA client if enabled + if option.RdmaEnabled && option.RdmaSidecarAddr != "" { + rdmaClient, err := NewRDMAMountClient( + option.RdmaSidecarAddr, + wfs.LookupFn(), + option.RdmaMaxConcurrent, + option.RdmaTimeoutMs, + ) + if err != nil { + glog.Warningf("Failed to initialize RDMA client: %v", err) + } else { + wfs.rdmaClient = rdmaClient + glog.Infof("RDMA acceleration enabled: sidecar=%s, maxConcurrent=%d, timeout=%dms", + option.RdmaSidecarAddr, option.RdmaMaxConcurrent, option.RdmaTimeoutMs) + } + } + if wfs.option.ConcurrentWriters > 0 { wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters) wfs.concurrentCopiersSem = make(chan struct{}, wfs.option.ConcurrentWriters) |
