diff options
Diffstat (limited to 'weed/mount/filehandle_read.go')
| -rw-r--r-- | weed/mount/filehandle_read.go | 67 |
1 files changed, 67 insertions, 0 deletions
diff --git a/weed/mount/filehandle_read.go b/weed/mount/filehandle_read.go index 87cf76655..88b020bf1 100644 --- a/weed/mount/filehandle_read.go +++ b/weed/mount/filehandle_read.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "sort" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -64,6 +65,17 @@ func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte return int64(totalRead), 0, nil } + // Try RDMA acceleration first if available + if fh.wfs.rdmaClient != nil && fh.wfs.option.RdmaEnabled { + totalRead, ts, err := fh.tryRDMARead(ctx, fileSize, buff, offset, entry) + if err == nil { + glog.V(4).Infof("RDMA read successful for %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead) + return int64(totalRead), ts, nil + } + glog.V(4).Infof("RDMA read failed for %s, falling back to HTTP: %v", fileFullPath, err) + } + + // Fall back to normal chunk reading totalRead, ts, err := fh.entryChunkGroup.ReadDataAt(ctx, fileSize, buff, offset) if err != nil && err != io.EOF { @@ -75,6 +87,61 @@ func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte return int64(totalRead), ts, err } +// tryRDMARead attempts to read file data using RDMA acceleration +func (fh *FileHandle) tryRDMARead(ctx context.Context, fileSize int64, buff []byte, offset int64, entry *LockedEntry) (int64, int64, error) { + // For now, we'll try to read the chunks directly using RDMA + // This is a simplified approach - in a full implementation, we'd need to + // handle chunk boundaries, multiple chunks, etc. + + chunks := entry.GetEntry().Chunks + if len(chunks) == 0 { + return 0, 0, fmt.Errorf("no chunks available for RDMA read") + } + + // Find the chunk that contains our offset using binary search + var targetChunk *filer_pb.FileChunk + var chunkOffset int64 + + // Get cached cumulative offsets for efficient binary search + cumulativeOffsets := fh.getCumulativeOffsets(chunks) + + // Use binary search to find the chunk containing the offset + chunkIndex := sort.Search(len(chunks), func(i int) bool { + return offset < cumulativeOffsets[i+1] + }) + + // Verify the chunk actually contains our offset + if chunkIndex < len(chunks) && offset >= cumulativeOffsets[chunkIndex] { + targetChunk = chunks[chunkIndex] + chunkOffset = offset - cumulativeOffsets[chunkIndex] + } + + if targetChunk == nil { + return 0, 0, fmt.Errorf("no chunk found for offset %d", offset) + } + + // Calculate how much to read from this chunk + remainingInChunk := int64(targetChunk.Size) - chunkOffset + readSize := min(int64(len(buff)), remainingInChunk) + + glog.V(4).Infof("RDMA read attempt: chunk=%s (fileId=%s), chunkOffset=%d, readSize=%d", + targetChunk.FileId, targetChunk.FileId, chunkOffset, readSize) + + // Try RDMA read using file ID directly (more efficient) + data, isRDMA, err := fh.wfs.rdmaClient.ReadNeedle(ctx, targetChunk.FileId, uint64(chunkOffset), uint64(readSize)) + if err != nil { + return 0, 0, fmt.Errorf("RDMA read failed: %w", err) + } + + if !isRDMA { + return 0, 0, fmt.Errorf("RDMA not available for chunk") + } + + // Copy data to buffer + copied := copy(buff, data) + return int64(copied), targetChunk.ModifiedTsNs, nil +} + func (fh *FileHandle) downloadRemoteEntry(entry *LockedEntry) error { fileFullPath := fh.FullPath() |
