aboutsummaryrefslogtreecommitdiff
path: root/weed/mount/filehandle_read.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mount/filehandle_read.go')
-rw-r--r--weed/mount/filehandle_read.go67
1 files changed, 67 insertions, 0 deletions
diff --git a/weed/mount/filehandle_read.go b/weed/mount/filehandle_read.go
index 87cf76655..88b020bf1 100644
--- a/weed/mount/filehandle_read.go
+++ b/weed/mount/filehandle_read.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
+ "sort"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -64,6 +65,17 @@ func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte
return int64(totalRead), 0, nil
}
+ // Try RDMA acceleration first if available
+ if fh.wfs.rdmaClient != nil && fh.wfs.option.RdmaEnabled {
+ totalRead, ts, err := fh.tryRDMARead(ctx, fileSize, buff, offset, entry)
+ if err == nil {
+ glog.V(4).Infof("RDMA read successful for %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead)
+ return int64(totalRead), ts, nil
+ }
+ glog.V(4).Infof("RDMA read failed for %s, falling back to HTTP: %v", fileFullPath, err)
+ }
+
+ // Fall back to normal chunk reading
totalRead, ts, err := fh.entryChunkGroup.ReadDataAt(ctx, fileSize, buff, offset)
if err != nil && err != io.EOF {
@@ -75,6 +87,61 @@ func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte
return int64(totalRead), ts, err
}
+// tryRDMARead attempts to read file data using RDMA acceleration
+func (fh *FileHandle) tryRDMARead(ctx context.Context, fileSize int64, buff []byte, offset int64, entry *LockedEntry) (int64, int64, error) {
+ // For now, we'll try to read the chunks directly using RDMA
+ // This is a simplified approach - in a full implementation, we'd need to
+ // handle chunk boundaries, multiple chunks, etc.
+
+ chunks := entry.GetEntry().Chunks
+ if len(chunks) == 0 {
+ return 0, 0, fmt.Errorf("no chunks available for RDMA read")
+ }
+
+ // Find the chunk that contains our offset using binary search
+ var targetChunk *filer_pb.FileChunk
+ var chunkOffset int64
+
+ // Get cached cumulative offsets for efficient binary search
+ cumulativeOffsets := fh.getCumulativeOffsets(chunks)
+
+ // Use binary search to find the chunk containing the offset
+ chunkIndex := sort.Search(len(chunks), func(i int) bool {
+ return offset < cumulativeOffsets[i+1]
+ })
+
+ // Verify the chunk actually contains our offset
+ if chunkIndex < len(chunks) && offset >= cumulativeOffsets[chunkIndex] {
+ targetChunk = chunks[chunkIndex]
+ chunkOffset = offset - cumulativeOffsets[chunkIndex]
+ }
+
+ if targetChunk == nil {
+ return 0, 0, fmt.Errorf("no chunk found for offset %d", offset)
+ }
+
+ // Calculate how much to read from this chunk
+ remainingInChunk := int64(targetChunk.Size) - chunkOffset
+ readSize := min(int64(len(buff)), remainingInChunk)
+
+ glog.V(4).Infof("RDMA read attempt: chunk=%s (fileId=%s), chunkOffset=%d, readSize=%d",
+ targetChunk.FileId, targetChunk.FileId, chunkOffset, readSize)
+
+ // Try RDMA read using file ID directly (more efficient)
+ data, isRDMA, err := fh.wfs.rdmaClient.ReadNeedle(ctx, targetChunk.FileId, uint64(chunkOffset), uint64(readSize))
+ if err != nil {
+ return 0, 0, fmt.Errorf("RDMA read failed: %w", err)
+ }
+
+ if !isRDMA {
+ return 0, 0, fmt.Errorf("RDMA not available for chunk")
+ }
+
+ // Copy data to buffer
+ copied := copy(buff, data)
+ return int64(copied), targetChunk.ModifiedTsNs, nil
+}
+
func (fh *FileHandle) downloadRemoteEntry(entry *LockedEntry) error {
fileFullPath := fh.FullPath()