aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer2/filechunks.go21
-rw-r--r--weed/filer2/reader_at.go2
-rw-r--r--weed/filer2/stream.go4
-rw-r--r--weed/replication/sink/azuresink/azure_sink.go2
-rw-r--r--weed/replication/sink/b2sink/b2_sink.go2
-rw-r--r--weed/replication/sink/gcssink/gcs_sink.go2
-rw-r--r--weed/util/chunk_cache/chunk_cache.go86
-rw-r--r--weed/util/chunk_cache/chunk_cache_on_disk_test.go6
-rw-r--r--weed/util/chunk_cache/on_disk_cache_layer.go83
9 files changed, 130 insertions, 78 deletions
diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go
index 48eaeea27..2ddfb3c30 100644
--- a/weed/filer2/filechunks.go
+++ b/weed/filer2/filechunks.go
@@ -85,11 +85,15 @@ type ChunkView struct {
Offset int64
Size uint64
LogicOffset int64
- IsFullChunk bool
+ ChunkSize uint64
CipherKey []byte
IsGzipped bool
}
+func (cv *ChunkView) IsFullChunk() bool {
+ return cv.Size == cv.ChunkSize
+}
+
func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
visibles := NonOverlappingVisibleIntervals(chunks)
@@ -111,13 +115,12 @@ func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int
for _, chunk := range visibles {
if chunk.start <= offset && offset < chunk.stop && offset < stop {
- isFullChunk := chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop
views = append(views, &ChunkView{
FileId: chunk.fileId,
Offset: offset - chunk.start, // offset is the data starting location in this file id
Size: uint64(min(chunk.stop, stop) - offset),
LogicOffset: offset,
- IsFullChunk: isFullChunk,
+ ChunkSize: chunk.chunkSize,
CipherKey: chunk.cipherKey,
IsGzipped: chunk.isGzipped,
})
@@ -146,7 +149,7 @@ var bufPool = sync.Pool{
func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.FileChunk) []VisibleInterval {
- newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, true, chunk.CipherKey, chunk.IsGzipped)
+ newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, chunk.Size, chunk.CipherKey, chunk.IsGzipped)
length := len(visibles)
if length == 0 {
@@ -160,11 +163,11 @@ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.
logPrintf(" before", visibles)
for _, v := range visibles {
if v.start < chunk.Offset && chunk.Offset < v.stop {
- newVisibles = append(newVisibles, newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, false, v.cipherKey, v.isGzipped))
+ newVisibles = append(newVisibles, newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, chunk.Size, v.cipherKey, v.isGzipped))
}
chunkStop := chunk.Offset + int64(chunk.Size)
if v.start < chunkStop && chunkStop < v.stop {
- newVisibles = append(newVisibles, newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, false, v.cipherKey, v.isGzipped))
+ newVisibles = append(newVisibles, newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, chunk.Size, v.cipherKey, v.isGzipped))
}
if chunkStop <= v.start || v.stop <= chunk.Offset {
newVisibles = append(newVisibles, v)
@@ -216,18 +219,18 @@ type VisibleInterval struct {
stop int64
modifiedTime int64
fileId string
- isFullChunk bool
+ chunkSize uint64
cipherKey []byte
isGzipped bool
}
-func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, isFullChunk bool, cipherKey []byte, isGzipped bool) VisibleInterval {
+func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, chunkSize uint64, cipherKey []byte, isGzipped bool) VisibleInterval {
return VisibleInterval{
start: start,
stop: stop,
fileId: fileId,
modifiedTime: modifiedTime,
- isFullChunk: isFullChunk,
+ chunkSize: chunkSize,
cipherKey: cipherKey,
isGzipped: isGzipped,
}
diff --git a/weed/filer2/reader_at.go b/weed/filer2/reader_at.go
index c1ad1677f..f56ef6388 100644
--- a/weed/filer2/reader_at.go
+++ b/weed/filer2/reader_at.go
@@ -106,7 +106,7 @@ func (c *ChunkReadAt) fetchChunkData(chunkView *ChunkView) (data []byte, err err
// fmt.Printf("fetching %s [%d,%d)\n", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
hasDataInCache := false
- chunkData := c.chunkCache.GetChunk(chunkView.FileId)
+ chunkData := c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
if chunkData != nil {
glog.V(3).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
hasDataInCache = true
diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go
index bf3781ae2..3cb69f72b 100644
--- a/weed/filer2/stream.go
+++ b/weed/filer2/stream.go
@@ -31,7 +31,7 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f
for _, chunkView := range chunkViews {
urlString := fileId2Url[chunkView.FileId]
- err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
w.Write(data)
})
if err != nil {
@@ -128,7 +128,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
return err
}
var buffer bytes.Buffer
- err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data)
})
if err != nil {
diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go
index d75dbe9af..aef97c06e 100644
--- a/weed/replication/sink/azuresink/azure_sink.go
+++ b/weed/replication/sink/azuresink/azure_sink.go
@@ -115,7 +115,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error {
}
var writeErr error
- readErr := util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk, chunk.Offset, int(chunk.Size), func(data []byte) {
+ readErr := util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
_, writeErr = appendBlobURL.AppendBlock(context.Background(), bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil)
})
diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go
index b5d410a75..1e7d82ed4 100644
--- a/weed/replication/sink/b2sink/b2_sink.go
+++ b/weed/replication/sink/b2sink/b2_sink.go
@@ -103,7 +103,7 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error {
}
var writeErr error
- readErr := util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk, chunk.Offset, int(chunk.Size), func(data []byte) {
+ readErr := util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
_, err := writer.Write(data)
if err != nil {
writeErr = err
diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go
index b1a8d7753..bb5a54272 100644
--- a/weed/replication/sink/gcssink/gcs_sink.go
+++ b/weed/replication/sink/gcssink/gcs_sink.go
@@ -101,7 +101,7 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error {
return err
}
- err = util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk, chunk.Offset, int(chunk.Size), func(data []byte) {
+ err = util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
wc.Write(data)
})
diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go
index 7c4a77304..232e57a55 100644
--- a/weed/util/chunk_cache/chunk_cache.go
+++ b/weed/util/chunk_cache/chunk_cache.go
@@ -1,52 +1,39 @@
package chunk_cache
import (
- "fmt"
- "path"
- "sort"
"sync"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
+const (
+ memCacheSizeLimit = 1024 * 1024
+)
+
// a global cache for recently accessed file chunks
type ChunkCache struct {
- memCache *ChunkCacheInMemory
- diskCaches []*ChunkCacheVolume
+ memCache *ChunkCacheInMemory
+ diskCache *OnDiskCacheLayer
sync.RWMutex
}
func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64, segmentCount int) *ChunkCache {
- c := &ChunkCache{
- memCache: NewChunkCacheInMemory(maxEntries),
- }
volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000)
if volumeCount < segmentCount {
volumeCount, volumeSize = segmentCount, diskSizeMB/int64(segmentCount)
}
- for i := 0; i < volumeCount; i++ {
- fileName := path.Join(dir, fmt.Sprintf("cache_%d", i))
- diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize*1024*1024)
- if err != nil {
- glog.Errorf("failed to add cache %s : %v", fileName, err)
- } else {
- c.diskCaches = append(c.diskCaches, diskCache)
- }
+ c := &ChunkCache{
+ memCache: NewChunkCacheInMemory(maxEntries),
+ diskCache: NewOnDiskCacheLayer(dir, "cache", volumeCount, volumeSize),
}
- // keep newest cache to the front
- sort.Slice(c.diskCaches, func(i, j int) bool {
- return c.diskCaches[i].lastModTime.After(c.diskCaches[j].lastModTime)
- })
-
return c
}
-func (c *ChunkCache) GetChunk(fileId string) (data []byte) {
+func (c *ChunkCache) GetChunk(fileId string, chunkSize uint64) (data []byte) {
if c == nil {
return
}
@@ -54,12 +41,15 @@ func (c *ChunkCache) GetChunk(fileId string) (data []byte) {
c.RLock()
defer c.RUnlock()
- return c.doGetChunk(fileId)
+ return c.doGetChunk(fileId, chunkSize)
}
-func (c *ChunkCache) doGetChunk(fileId string) (data []byte) {
- if data = c.memCache.GetChunk(fileId); data != nil {
- return data
+func (c *ChunkCache) doGetChunk(fileId string, chunkSize uint64) (data []byte) {
+
+ if chunkSize < memCacheSizeLimit {
+ if data = c.memCache.GetChunk(fileId); data != nil {
+ return data
+ }
}
fid, err := needle.ParseFileIdFromString(fileId)
@@ -67,20 +57,9 @@ func (c *ChunkCache) doGetChunk(fileId string) (data []byte) {
glog.Errorf("failed to parse file id %s", fileId)
return nil
}
- for _, diskCache := range c.diskCaches {
- data, err = diskCache.GetNeedle(fid.Key)
- if err == storage.ErrorNotFound {
- continue
- }
- if err != nil {
- glog.Errorf("failed to read cache file %s id %s", diskCache.fileName, fileId)
- continue
- }
- if len(data) != 0 {
- return
- }
- }
- return nil
+
+ return c.diskCache.getChunk(fid.Key)
+
}
func (c *ChunkCache) SetChunk(fileId string, data []byte) {
@@ -95,22 +74,8 @@ func (c *ChunkCache) SetChunk(fileId string, data []byte) {
func (c *ChunkCache) doSetChunk(fileId string, data []byte) {
- c.memCache.SetChunk(fileId, data)
-
- if len(c.diskCaches) == 0 {
- return
- }
-
- if c.diskCaches[0].fileSize+int64(len(data)) > c.diskCaches[0].sizeLimit {
- t, resetErr := c.diskCaches[len(c.diskCaches)-1].Reset()
- if resetErr != nil {
- glog.Errorf("failed to reset cache file %s", c.diskCaches[len(c.diskCaches)-1].fileName)
- return
- }
- for i := len(c.diskCaches) - 1; i > 0; i-- {
- c.diskCaches[i] = c.diskCaches[i-1]
- }
- c.diskCaches[0] = t
+ if len(data) < memCacheSizeLimit {
+ c.memCache.SetChunk(fileId, data)
}
fid, err := needle.ParseFileIdFromString(fileId)
@@ -118,7 +83,8 @@ func (c *ChunkCache) doSetChunk(fileId string, data []byte) {
glog.Errorf("failed to parse file id %s", fileId)
return
}
- c.diskCaches[0].WriteNeedle(fid.Key, data)
+
+ c.diskCache.setChunk(fid.Key, data)
}
@@ -128,7 +94,5 @@ func (c *ChunkCache) Shutdown() {
}
c.Lock()
defer c.Unlock()
- for _, diskCache := range c.diskCaches {
- diskCache.Shutdown()
- }
+ c.diskCache.shutdown()
}
diff --git a/weed/util/chunk_cache/chunk_cache_on_disk_test.go b/weed/util/chunk_cache/chunk_cache_on_disk_test.go
index f93daf5a7..63bcba2be 100644
--- a/weed/util/chunk_cache/chunk_cache_on_disk_test.go
+++ b/weed/util/chunk_cache/chunk_cache_on_disk_test.go
@@ -23,6 +23,7 @@ func TestOnDisk(t *testing.T) {
type test_data struct {
data []byte
fileId string
+ size uint64
}
testData := make([]*test_data, writeCount)
for i := 0; i < writeCount; i++ {
@@ -31,12 +32,13 @@ func TestOnDisk(t *testing.T) {
testData[i] = &test_data{
data: buff,
fileId: fmt.Sprintf("1,%daabbccdd", i+1),
+ size: uint64(len(buff)),
}
cache.SetChunk(testData[i].fileId, testData[i].data)
}
for i := 0; i < writeCount; i++ {
- data := cache.GetChunk(testData[i].fileId)
+ data := cache.GetChunk(testData[i].fileId, testData[i].size)
if bytes.Compare(data, testData[i].data) != 0 {
t.Errorf("failed to write to and read from cache: %d", i)
}
@@ -47,7 +49,7 @@ func TestOnDisk(t *testing.T) {
cache = NewChunkCache(0, tmpDir, totalDiskSizeMb, segmentCount)
for i := 0; i < writeCount; i++ {
- data := cache.GetChunk(testData[i].fileId)
+ data := cache.GetChunk(testData[i].fileId, testData[i].size)
if bytes.Compare(data, testData[i].data) != 0 {
t.Errorf("failed to write to and read from cache: %d", i)
}
diff --git a/weed/util/chunk_cache/on_disk_cache_layer.go b/weed/util/chunk_cache/on_disk_cache_layer.go
new file mode 100644
index 000000000..065188ac3
--- /dev/null
+++ b/weed/util/chunk_cache/on_disk_cache_layer.go
@@ -0,0 +1,83 @@
+package chunk_cache
+
+import (
+ "fmt"
+ "path"
+ "sort"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+type OnDiskCacheLayer struct {
+ diskCaches []*ChunkCacheVolume
+}
+
+func NewOnDiskCacheLayer(dir, namePrefix string, volumeCount int, volumeSize int64) *OnDiskCacheLayer{
+ c := &OnDiskCacheLayer{}
+ for i := 0; i < volumeCount; i++ {
+ fileName := path.Join(dir, fmt.Sprintf("%s_%d", namePrefix, i))
+ diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize*1024*1024)
+ if err != nil {
+ glog.Errorf("failed to add cache %s : %v", fileName, err)
+ } else {
+ c.diskCaches = append(c.diskCaches, diskCache)
+ }
+ }
+
+ // keep newest cache to the front
+ sort.Slice(c.diskCaches, func(i, j int) bool {
+ return c.diskCaches[i].lastModTime.After(c.diskCaches[j].lastModTime)
+ })
+
+ return c
+}
+
+func (c *OnDiskCacheLayer) setChunk(needleId types.NeedleId, data []byte) {
+
+ if c.diskCaches[0].fileSize+int64(len(data)) > c.diskCaches[0].sizeLimit {
+ t, resetErr := c.diskCaches[len(c.diskCaches)-1].Reset()
+ if resetErr != nil {
+ glog.Errorf("failed to reset cache file %s", c.diskCaches[len(c.diskCaches)-1].fileName)
+ return
+ }
+ for i := len(c.diskCaches) - 1; i > 0; i-- {
+ c.diskCaches[i] = c.diskCaches[i-1]
+ }
+ c.diskCaches[0] = t
+ }
+
+ c.diskCaches[0].WriteNeedle(needleId, data)
+
+}
+
+func (c *OnDiskCacheLayer) getChunk(needleId types.NeedleId) (data []byte){
+
+ var err error
+
+ for _, diskCache := range c.diskCaches {
+ data, err = diskCache.GetNeedle(needleId)
+ if err == storage.ErrorNotFound {
+ continue
+ }
+ if err != nil {
+ glog.Errorf("failed to read cache file %s id %d", diskCache.fileName, needleId)
+ continue
+ }
+ if len(data) != 0 {
+ return
+ }
+ }
+
+ return nil
+
+}
+
+func (c *OnDiskCacheLayer) shutdown(){
+
+ for _, diskCache := range c.diskCaches {
+ diskCache.Shutdown()
+ }
+
+}