aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-02-25 21:55:04 -0800
committerchrislu <chris.lu@gmail.com>2022-02-25 21:55:04 -0800
commit3ad5fa6f6f513df152ff155b1fdd93a3c411b6ca (patch)
treef8547d0111603a2765c0179a0f00c1817a0ea9f9
parentfc7a4957eae6a3f58fb52c88210019e4af89a290 (diff)
downloadseaweedfs-3ad5fa6f6f513df152ff155b1fdd93a3c411b6ca.tar.xz
seaweedfs-3ad5fa6f6f513df152ff155b1fdd93a3c411b6ca.zip
chunk cache adds function ReadChunkAt
-rw-r--r--weed/util/chunk_cache/chunk_cache.go49
-rw-r--r--weed/util/chunk_cache/chunk_cache_in_memory.go18
-rw-r--r--weed/util/chunk_cache/chunk_cache_on_disk.go22
-rw-r--r--weed/util/chunk_cache/chunk_cache_on_disk_test.go25
-rw-r--r--weed/util/chunk_cache/on_disk_cache_layer.go20
5 files changed, 125 insertions, 9 deletions
diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go
index 40d24b322..5c4be17a1 100644
--- a/weed/util/chunk_cache/chunk_cache.go
+++ b/weed/util/chunk_cache/chunk_cache.go
@@ -13,6 +13,7 @@ var ErrorOutOfBounds = errors.New("attempt to read out of bounds")
type ChunkCache interface {
GetChunk(fileId string, minSize uint64) (data []byte)
GetChunkSlice(fileId string, offset, length uint64) []byte
+ ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error)
SetChunk(fileId string, data []byte)
}
@@ -145,6 +146,54 @@ func (c *TieredChunkCache) doGetChunkSlice(fileId string, offset, length uint64)
return nil
}
+func (c *TieredChunkCache) ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error) {
+ if c == nil {
+ return 0, nil
+ }
+
+ c.RLock()
+ defer c.RUnlock()
+
+ minSize := offset + uint64(len(data))
+ if minSize <= c.onDiskCacheSizeLimit0 {
+ n, err = c.memCache.readChunkAt(data, fileId, offset)
+ if err != nil {
+ glog.Errorf("failed to read from memcache: %s", err)
+ }
+ if n >= int(minSize) {
+ return n, nil
+ }
+ }
+
+ fid, err := needle.ParseFileIdFromString(fileId)
+ if err != nil {
+ glog.Errorf("failed to parse file id %s", fileId)
+ return n, nil
+ }
+
+ if minSize <= c.onDiskCacheSizeLimit0 {
+ n, err = c.diskCaches[0].readChunkAt(data, fid.Key, offset)
+ if n >= int(minSize) {
+ return
+ }
+ }
+ if minSize <= c.onDiskCacheSizeLimit1 {
+ n, err = c.diskCaches[1].readChunkAt(data, fid.Key, offset)
+ if n >= int(minSize) {
+ return
+ }
+ }
+ {
+ n, err = c.diskCaches[2].readChunkAt(data, fid.Key, offset)
+ if n >= int(minSize) {
+ return
+ }
+ }
+
+ return 0, nil
+
+}
+
func (c *TieredChunkCache) SetChunk(fileId string, data []byte) {
if c == nil {
return
diff --git a/weed/util/chunk_cache/chunk_cache_in_memory.go b/weed/util/chunk_cache/chunk_cache_in_memory.go
index d725f8a16..2982d0979 100644
--- a/weed/util/chunk_cache/chunk_cache_in_memory.go
+++ b/weed/util/chunk_cache/chunk_cache_in_memory.go
@@ -1,9 +1,8 @@
package chunk_cache
import (
- "time"
-
"github.com/karlseguin/ccache/v2"
+ "time"
)
// a global cache for recently accessed file chunks
@@ -45,6 +44,21 @@ func (c *ChunkCacheInMemory) getChunkSlice(fileId string, offset, length uint64)
return data[offset : int(offset)+wanted], nil
}
+func (c *ChunkCacheInMemory) readChunkAt(buffer []byte, fileId string, offset uint64) (int, error) {
+ item := c.cache.Get(fileId)
+ if item == nil {
+ return 0, nil
+ }
+ data := item.Value().([]byte)
+ item.Extend(time.Hour)
+ wanted := min(len(buffer), len(data)-int(offset))
+ if wanted < 0 {
+ return 0, ErrorOutOfBounds
+ }
+ n := copy(buffer, data[offset:int(offset)+wanted])
+ return n, nil
+}
+
func (c *ChunkCacheInMemory) SetChunk(fileId string, data []byte) {
localCopy := make([]byte, len(data))
copy(localCopy, data)
diff --git a/weed/util/chunk_cache/chunk_cache_on_disk.go b/weed/util/chunk_cache/chunk_cache_on_disk.go
index 36de5c972..100b5919e 100644
--- a/weed/util/chunk_cache/chunk_cache_on_disk.go
+++ b/weed/util/chunk_cache/chunk_cache_on_disk.go
@@ -144,6 +144,28 @@ func (v *ChunkCacheVolume) getNeedleSlice(key types.NeedleId, offset, length uin
return data, nil
}
+func (v *ChunkCacheVolume) readNeedleSliceAt(data []byte, key types.NeedleId, offset uint64) (n int, err error) {
+ nv, ok := v.nm.Get(key)
+ if !ok {
+ return 0, storage.ErrorNotFound
+ }
+ wanted := min(len(data), int(nv.Size)-int(offset))
+ if wanted < 0 {
+ // should never happen, but better than panicing
+ return 0, ErrorOutOfBounds
+ }
+ if n, err = v.DataBackend.ReadAt(data, nv.Offset.ToActualOffset()+int64(offset)); err != nil {
+ return n, fmt.Errorf("read %s.dat [%d,%d): %v",
+ v.fileName, nv.Offset.ToActualOffset()+int64(offset), int(nv.Offset.ToActualOffset())+int(offset)+wanted, err)
+ } else {
+ if n != wanted {
+ return n, fmt.Errorf("read %d, expected %d", n, wanted)
+ }
+ }
+
+ return n, nil
+}
+
func (v *ChunkCacheVolume) WriteNeedle(key types.NeedleId, data []byte) error {
offset := v.fileSize
diff --git a/weed/util/chunk_cache/chunk_cache_on_disk_test.go b/weed/util/chunk_cache/chunk_cache_on_disk_test.go
index 1e7738fa2..8c7880eee 100644
--- a/weed/util/chunk_cache/chunk_cache_on_disk_test.go
+++ b/weed/util/chunk_cache/chunk_cache_on_disk_test.go
@@ -3,6 +3,7 @@ package chunk_cache
import (
"bytes"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/util/mem"
"math/rand"
"testing"
)
@@ -18,7 +19,7 @@ func TestOnDisk(t *testing.T) {
type test_data struct {
data []byte
fileId string
- size uint64
+ size int
}
testData := make([]*test_data, writeCount)
for i := 0; i < writeCount; i++ {
@@ -27,29 +28,35 @@ func TestOnDisk(t *testing.T) {
testData[i] = &test_data{
data: buff,
fileId: fmt.Sprintf("1,%daabbccdd", i+1),
- size: uint64(len(buff)),
+ size: len(buff),
}
cache.SetChunk(testData[i].fileId, testData[i].data)
// read back right after write
- data := cache.GetChunk(testData[i].fileId, testData[i].size)
+ data := mem.Allocate(testData[i].size)
+ cache.ReadChunkAt(data, testData[i].fileId, 0)
if bytes.Compare(data, testData[i].data) != 0 {
t.Errorf("failed to write to and read from cache: %d", i)
}
+ mem.Free(data)
}
for i := 0; i < 2; i++ {
- data := cache.GetChunk(testData[i].fileId, testData[i].size)
+ data := mem.Allocate(testData[i].size)
+ cache.ReadChunkAt(data, testData[i].fileId, 0)
if bytes.Compare(data, testData[i].data) == 0 {
t.Errorf("old cache should have been purged: %d", i)
}
+ mem.Free(data)
}
for i := 2; i < writeCount; i++ {
- data := cache.GetChunk(testData[i].fileId, testData[i].size)
+ data := mem.Allocate(testData[i].size)
+ cache.ReadChunkAt(data, testData[i].fileId, 0)
if bytes.Compare(data, testData[i].data) != 0 {
t.Errorf("failed to write to and read from cache: %d", i)
}
+ mem.Free(data)
}
cache.Shutdown()
@@ -57,10 +64,12 @@ func TestOnDisk(t *testing.T) {
cache = NewTieredChunkCache(2, tmpDir, totalDiskSizeInKB, 1024)
for i := 0; i < 2; i++ {
- data := cache.GetChunk(testData[i].fileId, testData[i].size)
+ data := mem.Allocate(testData[i].size)
+ cache.ReadChunkAt(data, testData[i].fileId, 0)
if bytes.Compare(data, testData[i].data) == 0 {
t.Errorf("old cache should have been purged: %d", i)
}
+ mem.Free(data)
}
for i := 2; i < writeCount; i++ {
@@ -83,10 +92,12 @@ func TestOnDisk(t *testing.T) {
*/
continue
}
- data := cache.GetChunk(testData[i].fileId, testData[i].size)
+ data := mem.Allocate(testData[i].size)
+ cache.ReadChunkAt(data, testData[i].fileId, 0)
if bytes.Compare(data, testData[i].data) != 0 {
t.Errorf("failed to write to and read from cache: %d", i)
}
+ mem.Free(data)
}
cache.Shutdown()
diff --git a/weed/util/chunk_cache/on_disk_cache_layer.go b/weed/util/chunk_cache/on_disk_cache_layer.go
index 3a656110e..9115b1bb1 100644
--- a/weed/util/chunk_cache/on_disk_cache_layer.go
+++ b/weed/util/chunk_cache/on_disk_cache_layer.go
@@ -108,6 +108,26 @@ func (c *OnDiskCacheLayer) getChunkSlice(needleId types.NeedleId, offset, length
}
+func (c *OnDiskCacheLayer) readChunkAt(buffer []byte, needleId types.NeedleId, offset uint64) (n int, err error) {
+
+ for _, diskCache := range c.diskCaches {
+ n, err = diskCache.readNeedleSliceAt(buffer, needleId, offset)
+ if err == storage.ErrorNotFound {
+ continue
+ }
+ if err != nil {
+ glog.Warningf("failed to read cache file %s id %d: %v", diskCache.fileName, needleId, err)
+ continue
+ }
+ if n > 0 {
+ return
+ }
+ }
+
+ return
+
+}
+
func (c *OnDiskCacheLayer) shutdown() {
for _, diskCache := range c.diskCaches {