aboutsummaryrefslogtreecommitdiff
path: root/weed/mount/weedfs_file_read.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mount/weedfs_file_read.go')
-rw-r--r--weed/mount/weedfs_file_read.go34
1 files changed, 31 insertions, 3 deletions
diff --git a/weed/mount/weedfs_file_read.go b/weed/mount/weedfs_file_read.go
index bf9c89071..dc79d3dc7 100644
--- a/weed/mount/weedfs_file_read.go
+++ b/weed/mount/weedfs_file_read.go
@@ -2,10 +2,12 @@ package mount
import (
"bytes"
+ "context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/util"
"io"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -45,8 +47,17 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse
fhActiveLock := fh.wfs.fhLockTable.AcquireLock("Read", fh.fh, util.SharedLock)
defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
+ // Create a context that will be cancelled when the cancel channel receives a signal
+ ctx, cancelFunc := context.WithCancel(context.Background())
+ go func() {
+ select {
+ case <-cancel:
+ cancelFunc()
+ }
+ }()
+
offset := int64(in.Offset)
- totalRead, err := readDataByFileHandle(buff, fh, offset)
+ totalRead, err := readDataByFileHandleWithContext(ctx, buff, fh, offset)
if err != nil {
glog.Warningf("file handle read %s %d: %v", fh.FullPath(), totalRead, err)
return nil, fuse.EIO
@@ -59,7 +70,7 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse
if bytes.Compare(mirrorData, buff[:totalRead]) != 0 {
againBuff := make([]byte, len(buff))
- againRead, _ := readDataByFileHandle(againBuff, fh, offset)
+ againRead, _ := readDataByFileHandleWithContext(ctx, againBuff, fh, offset)
againCorrect := bytes.Compare(mirrorData, againBuff[:againRead]) == 0
againSame := bytes.Compare(buff[:totalRead], againBuff[:againRead]) == 0
@@ -88,3 +99,20 @@ func readDataByFileHandle(buff []byte, fhIn *FileHandle, offset int64) (int64, e
}
return n, err
}
+
+func readDataByFileHandleWithContext(ctx context.Context, buff []byte, fhIn *FileHandle, offset int64) (int64, error) {
+ // read data from source file
+ size := len(buff)
+ fhIn.lockForRead(offset, size)
+ defer fhIn.unlockForRead(offset, size)
+
+ n, tsNs, err := fhIn.readFromChunksWithContext(ctx, buff, offset)
+ if err == nil || err == io.EOF {
+ maxStop := fhIn.readFromDirtyPages(buff, offset, tsNs)
+ n = max(maxStop-offset, n)
+ }
+ if err == io.EOF {
+ err = nil
+ }
+ return n, err
+}