aboutsummaryrefslogtreecommitdiff
path: root/weed/mount
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-08-06 10:09:26 -0700
committerGitHub <noreply@github.com>2025-08-06 10:09:26 -0700
commit4af182f88001033440f5d397f26b1c6f5e7635e1 (patch)
tree65e0de8aa22c2daa273fb514e772edeae0d7141e /weed/mount
parente446234e9c2512b01040fffcefc429a8d974808e (diff)
downloadseaweedfs-4af182f88001033440f5d397f26b1c6f5e7635e1.tar.xz
seaweedfs-4af182f88001033440f5d397f26b1c6f5e7635e1.zip
Context cancellation during reading range reading large files (#7093)
* context cancellation during reading range reading large files * address comments * cancellation for fuse read * fix cancellation * pass in context for each function to avoid racing condition * Update reader_at_test.go * remove dead code * Update weed/filer/reader_at.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/filer/filechunk_group.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/filer/filechunk_group.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * address comments * Update weed/mount/weedfs_file_read.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/mount/weedfs_file_lseek.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/mount/weedfs_file_read.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/filer/reader_at.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/mount/weedfs_file_lseek.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * test cancellation --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Diffstat (limited to 'weed/mount')
-rw-r--r--weed/mount/filehandle.go5
-rw-r--r--weed/mount/filehandle_read.go6
-rw-r--r--weed/mount/weedfs_file_lseek.go15
-rw-r--r--weed/mount/weedfs_file_read.go34
4 files changed, 52 insertions, 8 deletions
diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go
index f47d4a877..6cbc9745e 100644
--- a/weed/mount/filehandle.go
+++ b/weed/mount/filehandle.go
@@ -1,12 +1,13 @@
package mount
import (
+ "os"
+ "sync"
+
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
- "os"
- "sync"
)
type FileHandleId uint64
diff --git a/weed/mount/filehandle_read.go b/weed/mount/filehandle_read.go
index ce5f96341..87cf76655 100644
--- a/weed/mount/filehandle_read.go
+++ b/weed/mount/filehandle_read.go
@@ -23,6 +23,10 @@ func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64, tsNs in
}
func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, int64, error) {
+ return fh.readFromChunksWithContext(context.Background(), buff, offset)
+}
+
+func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte, offset int64) (int64, int64, error) {
fh.entryLock.RLock()
defer fh.entryLock.RUnlock()
@@ -60,7 +64,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, int64, e
return int64(totalRead), 0, nil
}
- totalRead, ts, err := fh.entryChunkGroup.ReadDataAt(fileSize, buff, offset)
+ totalRead, ts, err := fh.entryChunkGroup.ReadDataAt(ctx, fileSize, buff, offset)
if err != nil && err != io.EOF {
glog.Errorf("file handle read %s: %v", fileFullPath, err)
diff --git a/weed/mount/weedfs_file_lseek.go b/weed/mount/weedfs_file_lseek.go
index 0cf7ef43b..73564fdbe 100644
--- a/weed/mount/weedfs_file_lseek.go
+++ b/weed/mount/weedfs_file_lseek.go
@@ -1,9 +1,11 @@
package mount
import (
- "github.com/seaweedfs/seaweedfs/weed/util"
+ "context"
"syscall"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/seaweedfs/seaweedfs/weed/filer"
@@ -54,8 +56,17 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO
return ENXIO
}
+ // Create a context that will be cancelled when the cancel channel receives a signal
+ ctx, cancelFunc := context.WithCancel(context.Background())
+ go func() {
+ select {
+ case <-cancel:
+ cancelFunc()
+ }
+ }()
+
// search chunks for the offset
- found, offset := fh.entryChunkGroup.SearchChunks(offset, fileSize, in.Whence)
+ found, offset := fh.entryChunkGroup.SearchChunks(ctx, offset, fileSize, in.Whence)
if found {
out.Offset = uint64(offset)
return fuse.OK
diff --git a/weed/mount/weedfs_file_read.go b/weed/mount/weedfs_file_read.go
index bf9c89071..dc79d3dc7 100644
--- a/weed/mount/weedfs_file_read.go
+++ b/weed/mount/weedfs_file_read.go
@@ -2,10 +2,12 @@ package mount
import (
"bytes"
+ "context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/util"
"io"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -45,8 +47,17 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse
fhActiveLock := fh.wfs.fhLockTable.AcquireLock("Read", fh.fh, util.SharedLock)
defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
+ // Create a context that will be cancelled when the cancel channel receives a signal
+ ctx, cancelFunc := context.WithCancel(context.Background())
+ go func() {
+ select {
+ case <-cancel:
+ cancelFunc()
+ }
+ }()
+
offset := int64(in.Offset)
- totalRead, err := readDataByFileHandle(buff, fh, offset)
+ totalRead, err := readDataByFileHandleWithContext(ctx, buff, fh, offset)
if err != nil {
glog.Warningf("file handle read %s %d: %v", fh.FullPath(), totalRead, err)
return nil, fuse.EIO
@@ -59,7 +70,7 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse
if bytes.Compare(mirrorData, buff[:totalRead]) != 0 {
againBuff := make([]byte, len(buff))
- againRead, _ := readDataByFileHandle(againBuff, fh, offset)
+ againRead, _ := readDataByFileHandleWithContext(ctx, againBuff, fh, offset)
againCorrect := bytes.Compare(mirrorData, againBuff[:againRead]) == 0
againSame := bytes.Compare(buff[:totalRead], againBuff[:againRead]) == 0
@@ -88,3 +99,20 @@ func readDataByFileHandle(buff []byte, fhIn *FileHandle, offset int64) (int64, e
}
return n, err
}
+
+func readDataByFileHandleWithContext(ctx context.Context, buff []byte, fhIn *FileHandle, offset int64) (int64, error) {
+ // read data from source file
+ size := len(buff)
+ fhIn.lockForRead(offset, size)
+ defer fhIn.unlockForRead(offset, size)
+
+ n, tsNs, err := fhIn.readFromChunksWithContext(ctx, buff, offset)
+ if err == nil || err == io.EOF {
+ maxStop := fhIn.readFromDirtyPages(buff, offset, tsNs)
+ n = max(maxStop-offset, n)
+ }
+ if err == io.EOF {
+ err = nil
+ }
+ return n, err
+}