aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-02-26 02:16:47 -0800
committerchrislu <chris.lu@gmail.com>2022-02-26 02:16:47 -0800
commit28b395bef4887b55392af08f88aa345ea67b3d23 (patch)
tree988e4464663bc93a962c9f8c8947b0fb0bb1f768
parent3ad5fa6f6f513df152ff155b1fdd93a3c411b6ca (diff)
downloadseaweedfs-28b395bef4887b55392af08f88aa345ea67b3d23.tar.xz
seaweedfs-28b395bef4887b55392af08f88aa345ea67b3d23.zip
better control for reader caching
-rw-r--r--weed/filer/filechunk_manifest.go30
-rw-r--r--weed/filer/reader_at.go130
-rw-r--r--weed/filer/reader_at_test.go17
-rw-r--r--weed/filer/reader_cache.go187
-rw-r--r--weed/filer/stream.go18
-rw-r--r--weed/server/filer_server_handlers_read.go5
-rw-r--r--weed/util/chunk_cache/chunk_cache.go103
7 files changed, 240 insertions, 250 deletions
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index b6a64b30d..2c9dc5e74 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -3,6 +3,7 @@ package filer
import (
"bytes"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/util/mem"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"io"
"math"
@@ -77,7 +78,9 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c
}
// IsChunkManifest
- data, err := fetchChunk(lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
+ data := mem.Allocate(int(chunk.Size))
+ defer mem.Free(data)
+ _, err := fetchChunk(data, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
if err != nil {
return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err)
}
@@ -92,38 +95,37 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c
}
// TODO fetch from cache for weed mount?
-func fetchChunk(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
+func fetchChunk(data []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) (int, error) {
urlStrings, err := lookupFileIdFn(fileId)
if err != nil {
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
- return nil, err
+ return 0, err
}
- return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, true, 0, 0)
+ return retriedFetchChunkData(data, urlStrings, cipherKey, isGzipped, true, 0)
}
-func fetchChunkRange(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64, size int) ([]byte, error) {
+func fetchChunkRange(data []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64) (int, error) {
urlStrings, err := lookupFileIdFn(fileId)
if err != nil {
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
- return nil, err
+ return 0, err
}
- return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, false, offset, size)
+ return retriedFetchChunkData(data, urlStrings, cipherKey, isGzipped, false, offset)
}
-func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) ([]byte, error) {
+func retriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) {
- var err error
var shouldRetry bool
- receivedData := make([]byte, 0, size)
for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
for _, urlString := range urlStrings {
- receivedData = receivedData[:0]
+ n = 0
if strings.Contains(urlString, "%") {
urlString = url.PathEscape(urlString)
}
- shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
- receivedData = append(receivedData, data...)
+ shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) {
+ x := copy(buffer[n:], data)
+ n += x
})
if !shouldRetry {
break
@@ -142,7 +144,7 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool
}
}
- return receivedData, err
+ return n, err
}
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index b1c15152f..c3d801515 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -12,21 +12,15 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
"github.com/chrislusf/seaweedfs/weed/wdclient"
- "github.com/golang/groupcache/singleflight"
)
type ChunkReadAt struct {
- masterClient *wdclient.MasterClient
- chunkViews []*ChunkView
- lookupFileId wdclient.LookupFileIdFunctionType
- readerLock sync.Mutex
- fileSize int64
-
- fetchGroup singleflight.Group
- chunkCache chunk_cache.ChunkCache
- lastChunkFileId string
- lastChunkData []byte
- readerPattern *ReaderPattern
+ masterClient *wdclient.MasterClient
+ chunkViews []*ChunkView
+ readerLock sync.Mutex
+ fileSize int64
+ readerCache *ReaderCache
+ readerPattern *ReaderPattern
}
var _ = io.ReaderAt(&ChunkReadAt{})
@@ -90,16 +84,14 @@ func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chun
return &ChunkReadAt{
chunkViews: chunkViews,
- lookupFileId: lookupFn,
- chunkCache: chunkCache,
fileSize: fileSize,
+ readerCache: newReaderCache(5, chunkCache, lookupFn),
readerPattern: NewReaderPattern(),
}
}
func (c *ChunkReadAt) Close() error {
- c.lastChunkData = nil
- c.lastChunkFileId = ""
+ c.readerCache.destroy()
return nil
}
@@ -142,16 +134,13 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
continue
}
// glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size))
- var buffer []byte
bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset
- bufferLength := chunkStop - chunkStart
- buffer, err = c.readChunkSlice(chunk, nextChunk, uint64(bufferOffset), uint64(bufferLength))
+ copied, err := c.readChunkSliceAt(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], chunk, nextChunk, uint64(bufferOffset))
if err != nil {
glog.Errorf("fetching chunk %+v: %v\n", chunk, err)
- return
+ return copied, err
}
- copied := copy(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], buffer)
n += copied
startOffset, remaining = startOffset+int64(copied), remaining-int64(copied)
}
@@ -173,104 +162,15 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
}
-func (c *ChunkReadAt) readChunkSlice(chunkView *ChunkView, nextChunkViews *ChunkView, offset, length uint64) ([]byte, error) {
+func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, nextChunkViews *ChunkView, offset uint64) (n int, err error) {
- var chunkSlice []byte
- if chunkView.LogicOffset == 0 {
- chunkSlice = c.chunkCache.GetChunkSlice(chunkView.FileId, offset, length)
- }
- if len(chunkSlice) > 0 {
- return chunkSlice, nil
- }
- if c.lookupFileId == nil {
- return nil, nil
- }
if c.readerPattern.IsRandomMode() {
- return c.doFetchRangeChunkData(chunkView, offset, length)
- }
- chunkData, err := c.readFromWholeChunkData(chunkView, nextChunkViews)
- if err != nil {
- return nil, err
+ return c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), chunkView.LogicOffset == 0)
}
- wanted := min(int64(length), int64(len(chunkData))-int64(offset))
- return chunkData[offset : int64(offset)+wanted], nil
-}
-func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView, nextChunkViews ...*ChunkView) (chunkData []byte, err error) {
-
- if c.lastChunkFileId == chunkView.FileId {
- return c.lastChunkData, nil
+ n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), chunkView.LogicOffset == 0)
+ if nextChunkViews != nil {
+ c.readerCache.MaybeCache(nextChunkViews.FileId, nextChunkViews.CipherKey, nextChunkViews.IsGzipped, int(nextChunkViews.ChunkSize))
}
-
- v, doErr := c.readOneWholeChunk(chunkView)
-
- if doErr != nil {
- return nil, doErr
- }
-
- chunkData = v.([]byte)
-
- c.lastChunkData = chunkData
- c.lastChunkFileId = chunkView.FileId
-
- for _, nextChunkView := range nextChunkViews {
- if c.chunkCache != nil && nextChunkView != nil {
- go c.readOneWholeChunk(nextChunkView)
- }
- }
-
return
}
-
-func (c *ChunkReadAt) readOneWholeChunk(chunkView *ChunkView) (interface{}, error) {
-
- var err error
-
- return c.fetchGroup.Do(chunkView.FileId, func() (interface{}, error) {
-
- glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize)
-
- var data []byte
- if chunkView.LogicOffset == 0 {
- data = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
- }
- if data != nil {
- glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(data)))
- } else {
- var err error
- data, err = c.doFetchFullChunkData(chunkView)
- if err != nil {
- return data, err
- }
- if chunkView.LogicOffset == 0 {
- // only cache the first chunk
- c.chunkCache.SetChunk(chunkView.FileId, data)
- }
- }
- return data, err
- })
-}
-
-func (c *ChunkReadAt) doFetchFullChunkData(chunkView *ChunkView) ([]byte, error) {
-
- glog.V(4).Infof("+ doFetchFullChunkData %s", chunkView.FileId)
-
- data, err := fetchChunk(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped)
-
- glog.V(4).Infof("- doFetchFullChunkData %s", chunkView.FileId)
-
- return data, err
-
-}
-
-func (c *ChunkReadAt) doFetchRangeChunkData(chunkView *ChunkView, offset, length uint64) ([]byte, error) {
-
- glog.V(4).Infof("+ doFetchFullChunkData %s", chunkView.FileId)
-
- data, err := fetchChunkRange(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(length))
-
- glog.V(4).Infof("- doFetchFullChunkData %s", chunkView.FileId)
-
- return data, err
-
-}
diff --git a/weed/filer/reader_at_test.go b/weed/filer/reader_at_test.go
index 411d7eb3b..2d7bf90e9 100644
--- a/weed/filer/reader_at_test.go
+++ b/weed/filer/reader_at_test.go
@@ -21,8 +21,12 @@ func (m *mockChunkCache) GetChunk(fileId string, minSize uint64) (data []byte) {
return data
}
-func (m *mockChunkCache) GetChunkSlice(fileId string, offset, length uint64) []byte {
- return m.GetChunk(fileId, length)
+func (m *mockChunkCache) ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error) {
+ x, _ := strconv.Atoi(fileId)
+ for i := 0; i < len(data); i++ {
+ data[i] = byte(x)
+ }
+ return len(data), nil
}
func (m *mockChunkCache) SetChunk(fileId string, data []byte) {
@@ -65,10 +69,9 @@ func TestReaderAt(t *testing.T) {
readerAt := &ChunkReadAt{
chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
- lookupFileId: nil,
readerLock: sync.Mutex{},
fileSize: 10,
- chunkCache: &mockChunkCache{},
+ readerCache: newReaderCache(3, &mockChunkCache{}, nil),
readerPattern: NewReaderPattern(),
}
@@ -116,10 +119,9 @@ func TestReaderAt0(t *testing.T) {
readerAt := &ChunkReadAt{
chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
- lookupFileId: nil,
readerLock: sync.Mutex{},
fileSize: 10,
- chunkCache: &mockChunkCache{},
+ readerCache: newReaderCache(3, &mockChunkCache{}, nil),
readerPattern: NewReaderPattern(),
}
@@ -145,10 +147,9 @@ func TestReaderAt1(t *testing.T) {
readerAt := &ChunkReadAt{
chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
- lookupFileId: nil,
readerLock: sync.Mutex{},
fileSize: 20,
- chunkCache: &mockChunkCache{},
+ readerCache: newReaderCache(3, &mockChunkCache{}, nil),
readerPattern: NewReaderPattern(),
}
diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go
new file mode 100644
index 000000000..f6dac6e0a
--- /dev/null
+++ b/weed/filer/reader_cache.go
@@ -0,0 +1,187 @@
+package filer
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
+ "github.com/chrislusf/seaweedfs/weed/util/mem"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+ "sync"
+ "time"
+)
+
+type ReaderCache struct {
+ chunkCache chunk_cache.ChunkCache
+ lookupFileIdFn wdclient.LookupFileIdFunctionType
+ sync.Mutex
+ downloaders map[string]*SingleChunkCacher
+ limit int
+}
+
+type SingleChunkCacher struct {
+ sync.RWMutex
+ parent *ReaderCache
+ chunkFileId string
+ data []byte
+ err error
+ cipherKey []byte
+ isGzipped bool
+ chunkSize int
+ shouldCache bool
+ wg sync.WaitGroup
+ completedTime time.Time
+}
+
+func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
+ return &ReaderCache{
+ limit: limit,
+ chunkCache: chunkCache,
+ lookupFileIdFn: lookupFileIdFn,
+ downloaders: make(map[string]*SingleChunkCacher),
+ }
+}
+
+func (rc *ReaderCache) MaybeCache(fileId string, cipherKey []byte, isGzipped bool, chunkSize int) {
+ rc.Lock()
+ defer rc.Unlock()
+ if _, found := rc.downloaders[fileId]; found {
+ return
+ }
+ if rc.lookupFileIdFn == nil {
+ return
+ }
+
+ // if too many, delete one of them?
+ glog.V(0).Infof("downloader2 %d", len(rc.downloaders))
+ if len(rc.downloaders) >= rc.limit {
+ oldestFid, oldestTime := "", time.Now()
+ for fid, downloader := range rc.downloaders {
+ if !downloader.completedTime.IsZero() {
+ if downloader.completedTime.Before(oldestTime) {
+ oldestFid, oldestTime = fid, downloader.completedTime
+ }
+ }
+ }
+ if oldestFid != "" {
+ oldDownloader := rc.downloaders[oldestFid]
+ delete(rc.downloaders, oldestFid)
+ oldDownloader.destroy()
+ } else {
+ // if still no slots, return
+ return
+ }
+ }
+
+ cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, false)
+ cacher.wg.Add(1)
+ go cacher.startCaching()
+ cacher.wg.Wait()
+ rc.downloaders[fileId] = cacher
+
+ return
+}
+
+func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
+ rc.Lock()
+ defer rc.Unlock()
+ if cacher, found := rc.downloaders[fileId]; found {
+ return cacher.readChunkAt(buffer, offset)
+ }
+ if shouldCache || rc.lookupFileIdFn == nil {
+ n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
+ if n > 0 {
+ return n, err
+ }
+ }
+
+ glog.V(0).Infof("downloader1 %d", len(rc.downloaders))
+ if len(rc.downloaders) >= rc.limit {
+ oldestFid, oldestTime := "", time.Now()
+ for fid, downloader := range rc.downloaders {
+ if !downloader.completedTime.IsZero() {
+ if downloader.completedTime.Before(oldestTime) {
+ oldestFid, oldestTime = fid, downloader.completedTime
+ }
+ }
+ }
+ if oldestFid != "" {
+ oldDownloader := rc.downloaders[oldestFid]
+ delete(rc.downloaders, oldestFid)
+ oldDownloader.destroy()
+ }
+ }
+
+ cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache)
+ cacher.wg.Add(1)
+ go cacher.startCaching()
+ cacher.wg.Wait()
+ rc.downloaders[fileId] = cacher
+
+ return cacher.readChunkAt(buffer, offset)
+}
+
+func (rc *ReaderCache) destroy() {
+ rc.Lock()
+ defer rc.Unlock()
+
+ for _, downloader := range rc.downloaders {
+ downloader.destroy()
+ }
+
+}
+
+func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher {
+ t := &SingleChunkCacher{
+ parent: parent,
+ chunkFileId: fileId,
+ cipherKey: cipherKey,
+ isGzipped: isGzipped,
+ chunkSize: chunkSize,
+ shouldCache: shouldCache,
+ }
+ return t
+}
+
+func (s *SingleChunkCacher) startCaching() {
+ s.Lock()
+ defer s.Unlock()
+
+ s.wg.Done() // means this has been started
+
+ urlStrings, err := s.parent.lookupFileIdFn(s.chunkFileId)
+ if err != nil {
+ s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err)
+ return
+ }
+
+ s.data = mem.Allocate(s.chunkSize)
+
+ _, s.err = retriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
+ if s.err != nil {
+ mem.Free(s.data)
+ s.data = nil
+ return
+ }
+
+ s.completedTime = time.Now()
+ if s.shouldCache {
+ s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
+ }
+
+ return
+}
+
+func (s *SingleChunkCacher) destroy() {
+ if s.data != nil {
+ mem.Free(s.data)
+ s.data = nil
+ }
+}
+
+func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
+ s.RLock()
+ defer s.RUnlock()
+
+ return copy(buf, s.data[offset:]), s.err
+
+}
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index e5163f2d9..b65641cbf 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -133,30 +133,30 @@ func writeZero(w io.Writer, size int64) (err error) {
return
}
-func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) {
-
- buffer := bytes.Buffer{}
+func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) error {
lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
return masterClient.LookupFileId(fileId)
}
- chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
+ chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, int64(len(buffer)))
+
+ idx := 0
for _, chunkView := range chunkViews {
urlStrings, err := lookupFileIdFn(chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
- return nil, err
+ return err
}
- data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
+ n, err := retriedFetchChunkData(buffer[idx:idx+int(chunkView.Size)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset)
if err != nil {
- return nil, err
+ return err
}
- buffer.Write(data)
+ idx += n
}
- return buffer.Bytes(), nil
+ return nil
}
// ---------------- ChunkStreamReader ----------------------------------
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index 2c51931c1..431eea979 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/util/mem"
"io"
"mime"
"net/http"
@@ -189,7 +190,9 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
width, height, mode, shouldResize := shouldResizeImages(ext, r)
if shouldResize {
- data, err := filer.ReadAll(fs.filer.MasterClient, entry.Chunks)
+ data := mem.Allocate(int(totalSize))
+ defer mem.Free(data)
+ err := filer.ReadAll(data, fs.filer.MasterClient, entry.Chunks)
if err != nil {
glog.Errorf("failed to read %s: %v", path, err)
w.WriteHeader(http.StatusNotModified)
diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go
index 5c4be17a1..3f3b264b1 100644
--- a/weed/util/chunk_cache/chunk_cache.go
+++ b/weed/util/chunk_cache/chunk_cache.go
@@ -11,8 +11,6 @@ import (
var ErrorOutOfBounds = errors.New("attempt to read out of bounds")
type ChunkCache interface {
- GetChunk(fileId string, minSize uint64) (data []byte)
- GetChunkSlice(fileId string, offset, length uint64) []byte
ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error)
SetChunk(fileId string, data []byte)
}
@@ -45,107 +43,6 @@ func NewTieredChunkCache(maxEntries int64, dir string, diskSizeInUnit int64, uni
return c
}
-func (c *TieredChunkCache) GetChunk(fileId string, minSize uint64) (data []byte) {
- if c == nil {
- return
- }
-
- c.RLock()
- defer c.RUnlock()
-
- return c.doGetChunk(fileId, minSize)
-}
-
-func (c *TieredChunkCache) doGetChunk(fileId string, minSize uint64) (data []byte) {
-
- if minSize <= c.onDiskCacheSizeLimit0 {
- data = c.memCache.GetChunk(fileId)
- if len(data) >= int(minSize) {
- return data
- }
- }
-
- fid, err := needle.ParseFileIdFromString(fileId)
- if err != nil {
- glog.Errorf("failed to parse file id %s", fileId)
- return nil
- }
-
- if minSize <= c.onDiskCacheSizeLimit0 {
- data = c.diskCaches[0].getChunk(fid.Key)
- if len(data) >= int(minSize) {
- return data
- }
- }
- if minSize <= c.onDiskCacheSizeLimit1 {
- data = c.diskCaches[1].getChunk(fid.Key)
- if len(data) >= int(minSize) {
- return data
- }
- }
- {
- data = c.diskCaches[2].getChunk(fid.Key)
- if len(data) >= int(minSize) {
- return data
- }
- }
-
- return nil
-
-}
-
-func (c *TieredChunkCache) GetChunkSlice(fileId string, offset, length uint64) []byte {
- if c == nil {
- return nil
- }
-
- c.RLock()
- defer c.RUnlock()
-
- return c.doGetChunkSlice(fileId, offset, length)
-}
-
-func (c *TieredChunkCache) doGetChunkSlice(fileId string, offset, length uint64) (data []byte) {
-
- minSize := offset + length
- if minSize <= c.onDiskCacheSizeLimit0 {
- data, err := c.memCache.getChunkSlice(fileId, offset, length)
- if err != nil {
- glog.Errorf("failed to read from memcache: %s", err)
- }
- if len(data) >= int(minSize) {
- return data
- }
- }
-
- fid, err := needle.ParseFileIdFromString(fileId)
- if err != nil {
- glog.Errorf("failed to parse file id %s", fileId)
- return nil
- }
-
- if minSize <= c.onDiskCacheSizeLimit0 {
- data = c.diskCaches[0].getChunkSlice(fid.Key, offset, length)
- if len(data) >= int(minSize) {
- return data
- }
- }
- if minSize <= c.onDiskCacheSizeLimit1 {
- data = c.diskCaches[1].getChunkSlice(fid.Key, offset, length)
- if len(data) >= int(minSize) {
- return data
- }
- }
- {
- data = c.diskCaches[2].getChunkSlice(fid.Key, offset, length)
- if len(data) >= int(minSize) {
- return data
- }
- }
-
- return nil
-}
-
func (c *TieredChunkCache) ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error) {
if c == nil {
return 0, nil