aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2021-12-19 22:43:14 -0800
committerchrislu <chris.lu@gmail.com>2021-12-19 22:43:14 -0800
commita152f179378319ab888cfbc0b4f0741f562faa64 (patch)
tree0d3354a3fee668c108c9811b2f2bc790e6c59ff9
parent85c526c583f6e1eb0a9499ba58fd9250ef25722d (diff)
downloadseaweedfs-a152f179378319ab888cfbc0b4f0741f562faa64.tar.xz
seaweedfs-a152f179378319ab888cfbc0b4f0741f562faa64.zip
mount: improve read performance on random reads
-rw-r--r--weed/filer/filechunk_manifest.go9
-rw-r--r--weed/filer/reader_at.go28
-rw-r--r--weed/filer/reader_pattern.go31
-rw-r--r--weed/filesys/filehandle.go3
-rw-r--r--weed/server/volume_server_handlers_read.go2
5 files changed, 66 insertions, 7 deletions
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index 32008271b..b6a64b30d 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -101,6 +101,15 @@ func fetchChunk(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string,
return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, true, 0, 0)
}
+func fetchChunkRange(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64, size int) ([]byte, error) {
+ urlStrings, err := lookupFileIdFn(fileId)
+ if err != nil {
+ glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
+ return nil, err
+ }
+ return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, false, offset, size)
+}
+
func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) ([]byte, error) {
var err error
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index 458cf88be..27a4f29ec 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -26,6 +26,7 @@ type ChunkReadAt struct {
chunkCache chunk_cache.ChunkCache
lastChunkFileId string
lastChunkData []byte
+ readerPattern *ReaderPattern
}
var _ = io.ReaderAt(&ChunkReadAt{})
@@ -88,10 +89,11 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt {
return &ChunkReadAt{
- chunkViews: chunkViews,
- lookupFileId: lookupFn,
- chunkCache: chunkCache,
- fileSize: fileSize,
+ chunkViews: chunkViews,
+ lookupFileId: lookupFn,
+ chunkCache: chunkCache,
+ fileSize: fileSize,
+ readerPattern: NewReaderPattern(),
}
}
@@ -106,6 +108,8 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
c.readerLock.Lock()
defer c.readerLock.Unlock()
+ c.readerPattern.MonitorReadAt(offset, len(p))
+
// glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
return c.doReadAt(p, offset)
}
@@ -171,6 +175,10 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
func (c *ChunkReadAt) readChunkSlice(chunkView *ChunkView, nextChunkViews *ChunkView, offset, length uint64) ([]byte, error) {
+ if c.readerPattern.IsRandomMode() {
+ return c.doFetchRangeChunkData(chunkView, offset, length)
+ }
+
chunkSlice := c.chunkCache.GetChunkSlice(chunkView.FileId, offset, length)
if len(chunkSlice) > 0 {
return chunkSlice, nil
@@ -243,3 +251,15 @@ func (c *ChunkReadAt) doFetchFullChunkData(chunkView *ChunkView) ([]byte, error)
return data, err
}
+
+func (c *ChunkReadAt) doFetchRangeChunkData(chunkView *ChunkView, offset, length uint64) ([]byte, error) {
+
+ glog.V(4).Infof("+ doFetchFullChunkData %s", chunkView.FileId)
+
+ data, err := fetchChunkRange(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(length))
+
+ glog.V(4).Infof("- doFetchFullChunkData %s", chunkView.FileId)
+
+ return data, err
+
+}
diff --git a/weed/filer/reader_pattern.go b/weed/filer/reader_pattern.go
new file mode 100644
index 000000000..2bf18d141
--- /dev/null
+++ b/weed/filer/reader_pattern.go
@@ -0,0 +1,31 @@
+package filer
+
+type ReaderPattern struct {
+ isStreaming bool
+ lastReadOffset int64
+}
+
+// For streaming read: only cache the first chunk
+// For random read: only fetch the requested range, instead of the whole chunk
+
+func NewReaderPattern() *ReaderPattern {
+ return &ReaderPattern{
+ isStreaming: true,
+ lastReadOffset: 0,
+ }
+}
+
+func (rp *ReaderPattern) MonitorReadAt(offset int64, size int) {
+ if rp.lastReadOffset > offset {
+ rp.isStreaming = false
+ }
+ rp.lastReadOffset = offset
+}
+
+func (rp *ReaderPattern) IsStreamingMode() bool {
+ return rp.isStreaming
+}
+
+func (rp *ReaderPattern) IsRandomMode() bool {
+ return !rp.isStreaming
+}
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index 34affddb9..4f7b7f37e 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -62,10 +62,11 @@ var _ = fs.HandleReleaser(&FileHandle{})
func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
- glog.V(4).Infof("%s read fh %d: [%d,%d) size %d resp.Data cap=%d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, cap(resp.Data))
fh.Lock()
defer fh.Unlock()
+ glog.V(4).Infof("%s read fh %d: [%d,%d) size %d resp.Data cap=%d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, cap(resp.Data))
+
if req.Size <= 0 {
return nil
}
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index 38cc10d62..5ce2278bf 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -29,8 +29,6 @@ var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`)
func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
- glog.V(9).Info(r.Method + " " + r.URL.Path + " " + r.Header.Get("Range"))
-
stats.VolumeServerRequestCounter.WithLabelValues("get").Inc()
start := time.Now()
defer func() { stats.VolumeServerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }()