diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-08-06 10:09:26 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-08-06 10:09:26 -0700 |
| commit | 4af182f88001033440f5d397f26b1c6f5e7635e1 (patch) | |
| tree | 65e0de8aa22c2daa273fb514e772edeae0d7141e /weed/mount | |
| parent | e446234e9c2512b01040fffcefc429a8d974808e (diff) | |
| download | seaweedfs-4af182f88001033440f5d397f26b1c6f5e7635e1.tar.xz seaweedfs-4af182f88001033440f5d397f26b1c6f5e7635e1.zip | |
Context cancellation during reading range reading large files (#7093)
* context cancellation during reading range reading large files
* address comments
* cancellation for fuse read
* fix cancellation
* pass in context for each function to avoid racing condition
* Update reader_at_test.go
* remove dead code
* Update weed/filer/reader_at.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update weed/filer/filechunk_group.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update weed/filer/filechunk_group.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* address comments
* Update weed/mount/weedfs_file_read.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update weed/mount/weedfs_file_lseek.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update weed/mount/weedfs_file_read.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update weed/filer/reader_at.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update weed/mount/weedfs_file_lseek.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* test cancellation
---------
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Diffstat (limited to 'weed/mount')
| -rw-r--r-- | weed/mount/filehandle.go | 5 | ||||
| -rw-r--r-- | weed/mount/filehandle_read.go | 6 | ||||
| -rw-r--r-- | weed/mount/weedfs_file_lseek.go | 15 | ||||
| -rw-r--r-- | weed/mount/weedfs_file_read.go | 34 |
4 files changed, 52 insertions, 8 deletions
diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go index f47d4a877..6cbc9745e 100644 --- a/weed/mount/filehandle.go +++ b/weed/mount/filehandle.go @@ -1,12 +1,13 @@ package mount import ( + "os" + "sync" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" - "os" - "sync" ) type FileHandleId uint64 diff --git a/weed/mount/filehandle_read.go b/weed/mount/filehandle_read.go index ce5f96341..87cf76655 100644 --- a/weed/mount/filehandle_read.go +++ b/weed/mount/filehandle_read.go @@ -23,6 +23,10 @@ func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64, tsNs in } func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, int64, error) { + return fh.readFromChunksWithContext(context.Background(), buff, offset) +} + +func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte, offset int64) (int64, int64, error) { fh.entryLock.RLock() defer fh.entryLock.RUnlock() @@ -60,7 +64,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, int64, e return int64(totalRead), 0, nil } - totalRead, ts, err := fh.entryChunkGroup.ReadDataAt(fileSize, buff, offset) + totalRead, ts, err := fh.entryChunkGroup.ReadDataAt(ctx, fileSize, buff, offset) if err != nil && err != io.EOF { glog.Errorf("file handle read %s: %v", fileFullPath, err) diff --git a/weed/mount/weedfs_file_lseek.go b/weed/mount/weedfs_file_lseek.go index 0cf7ef43b..73564fdbe 100644 --- a/weed/mount/weedfs_file_lseek.go +++ b/weed/mount/weedfs_file_lseek.go @@ -1,9 +1,11 @@ package mount import ( - "github.com/seaweedfs/seaweedfs/weed/util" + "context" "syscall" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/hanwen/go-fuse/v2/fuse" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -54,8 +56,17 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO return ENXIO } + // Create a context that will be cancelled when the cancel channel receives a signal + ctx, cancelFunc := context.WithCancel(context.Background()) + go func() { + select { + case <-cancel: + cancelFunc() + } + }() + // search chunks for the offset - found, offset := fh.entryChunkGroup.SearchChunks(offset, fileSize, in.Whence) + found, offset := fh.entryChunkGroup.SearchChunks(ctx, offset, fileSize, in.Whence) if found { out.Offset = uint64(offset) return fuse.OK diff --git a/weed/mount/weedfs_file_read.go b/weed/mount/weedfs_file_read.go index bf9c89071..dc79d3dc7 100644 --- a/weed/mount/weedfs_file_read.go +++ b/weed/mount/weedfs_file_read.go @@ -2,10 +2,12 @@ package mount import ( "bytes" + "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/util" "io" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/hanwen/go-fuse/v2/fuse" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -45,8 +47,17 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse fhActiveLock := fh.wfs.fhLockTable.AcquireLock("Read", fh.fh, util.SharedLock) defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock) + // Create a context that will be cancelled when the cancel channel receives a signal + ctx, cancelFunc := context.WithCancel(context.Background()) + go func() { + select { + case <-cancel: + cancelFunc() + } + }() + offset := int64(in.Offset) - totalRead, err := readDataByFileHandle(buff, fh, offset) + totalRead, err := readDataByFileHandleWithContext(ctx, buff, fh, offset) if err != nil { glog.Warningf("file handle read %s %d: %v", fh.FullPath(), totalRead, err) return nil, fuse.EIO @@ -59,7 +70,7 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse if bytes.Compare(mirrorData, buff[:totalRead]) != 0 { againBuff := make([]byte, len(buff)) - againRead, _ := readDataByFileHandle(againBuff, fh, offset) + againRead, _ := readDataByFileHandleWithContext(ctx, againBuff, fh, offset) againCorrect := bytes.Compare(mirrorData, againBuff[:againRead]) == 0 againSame := bytes.Compare(buff[:totalRead], againBuff[:againRead]) == 0 @@ -88,3 +99,20 @@ func readDataByFileHandle(buff []byte, fhIn *FileHandle, offset int64) (int64, e } return n, err } + +func readDataByFileHandleWithContext(ctx context.Context, buff []byte, fhIn *FileHandle, offset int64) (int64, error) { + // read data from source file + size := len(buff) + fhIn.lockForRead(offset, size) + defer fhIn.unlockForRead(offset, size) + + n, tsNs, err := fhIn.readFromChunksWithContext(ctx, buff, offset) + if err == nil || err == io.EOF { + maxStop := fhIn.readFromDirtyPages(buff, offset, tsNs) + n = max(maxStop-offset, n) + } + if err == io.EOF { + err = nil + } + return n, err +} |
