From d7f3acb2c056534f29950f3586d804ec274349b2 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 11 Apr 2020 12:45:24 -0700 Subject: refactor --- weed/util/chunk_cache/chunk_cache.go | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 weed/util/chunk_cache/chunk_cache.go (limited to 'weed/util/chunk_cache') diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go new file mode 100644 index 000000000..e2676d9cc --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache.go @@ -0,0 +1,36 @@ +package chunk_cache + +import ( + "time" + + "github.com/karlseguin/ccache" +) + +// a global cache for recently accessed file chunks +type ChunkCache struct { + cache *ccache.Cache +} + +func NewChunkCache(maxEntries int64) *ChunkCache { + pruneCount := maxEntries >> 3 + if pruneCount <= 0 { + pruneCount = 500 + } + return &ChunkCache{ + cache: ccache.New(ccache.Configure().MaxSize(maxEntries).ItemsToPrune(uint32(pruneCount))), + } +} + +func (c *ChunkCache) GetChunk(fileId string) []byte { + item := c.cache.Get(fileId) + if item == nil { + return nil + } + data := item.Value().([]byte) + item.Extend(time.Hour) + return data +} + +func (c *ChunkCache) SetChunk(fileId string, data []byte) { + c.cache.Set(fileId, data, time.Hour) +} -- cgit v1.2.3 From df97da25f902912dd527d4aed567408c3ca0f9ae Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 11 Apr 2020 21:12:41 -0700 Subject: mount: add on disk caching --- weed/util/chunk_cache/chunk_cache.go | 111 ++++++++++++++--- weed/util/chunk_cache/chunk_cache_in_memory.go | 36 ++++++ weed/util/chunk_cache/chunk_cache_on_disk.go | 145 ++++++++++++++++++++++ weed/util/chunk_cache/chunk_cache_on_disk_test.go | 58 +++++++++ 4 files changed, 334 insertions(+), 16 deletions(-) create mode 100644 weed/util/chunk_cache/chunk_cache_in_memory.go create mode 100644 weed/util/chunk_cache/chunk_cache_on_disk.go create mode 100644 weed/util/chunk_cache/chunk_cache_on_disk_test.go (limited to 'weed/util/chunk_cache') diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go index e2676d9cc..682f5185a 100644 --- a/weed/util/chunk_cache/chunk_cache.go +++ b/weed/util/chunk_cache/chunk_cache.go @@ -1,36 +1,115 @@ package chunk_cache import ( - "time" + "fmt" + "path" + "sort" + "sync" - "github.com/karlseguin/ccache" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) // a global cache for recently accessed file chunks type ChunkCache struct { - cache *ccache.Cache + memCache *ChunkCacheInMemory + diskCaches []*ChunkCacheVolume + sync.RWMutex } -func NewChunkCache(maxEntries int64) *ChunkCache { - pruneCount := maxEntries >> 3 - if pruneCount <= 0 { - pruneCount = 500 +func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64, segmentCount int) *ChunkCache { + c := &ChunkCache{ + memCache: NewChunkCacheInMemory(maxEntries), } - return &ChunkCache{ - cache: ccache.New(ccache.Configure().MaxSize(maxEntries).ItemsToPrune(uint32(pruneCount))), + + volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000) + if volumeCount < segmentCount { + volumeCount, volumeSize = segmentCount, diskSizeMB/int64(segmentCount) + } + + for i := 0; i < volumeCount; i++ { + fileName := path.Join(dir, fmt.Sprintf("cache_%d", i)) + diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize*1024*1024) + if err != nil { + glog.Errorf("failed to add cache %s : %v", fileName, err) + } else { + c.diskCaches = append(c.diskCaches, diskCache) + } } + + // keep newest cache to the front + sort.Slice(c.diskCaches, func(i, j int) bool { + return c.diskCaches[i].lastModTime.After(c.diskCaches[j].lastModTime) + }) + + return c } -func (c *ChunkCache) GetChunk(fileId string) []byte { - item := c.cache.Get(fileId) - if item == nil { +func (c *ChunkCache) GetChunk(fileId string) (data []byte) { + c.RLock() + defer c.RUnlock() + + if data = c.memCache.GetChunk(fileId); data != nil { + return data + } + + fid, err := needle.ParseFileIdFromString(fileId) + if err != nil { + glog.Errorf("failed to parse file id %s", fileId) return nil } - data := item.Value().([]byte) - item.Extend(time.Hour) - return data + for _, diskCache := range c.diskCaches { + data, err = diskCache.GetNeedle(fid.Key) + if err == storage.ErrorNotFound { + continue + } + if err != nil { + glog.Errorf("failed to read cache file %s id %s", diskCache.fileName, fileId) + continue + } + if len(data) != 0 { + return + } + } + return nil } func (c *ChunkCache) SetChunk(fileId string, data []byte) { - c.cache.Set(fileId, data, time.Hour) + c.Lock() + defer c.Unlock() + + c.memCache.SetChunk(fileId, data) + + if len(c.diskCaches) == 0 { + return + } + + if c.diskCaches[0].fileSize+int64(len(data)) > c.diskCaches[0].sizeLimit { + t, resetErr := c.diskCaches[len(c.diskCaches)-1].Reset() + if resetErr != nil { + glog.Errorf("failed to reset cache file %s", c.diskCaches[len(c.diskCaches)-1].fileName) + return + } + for i := len(c.diskCaches) - 1; i > 0; i-- { + c.diskCaches[i] = c.diskCaches[i-1] + } + c.diskCaches[0] = t + } + + fid, err := needle.ParseFileIdFromString(fileId) + if err != nil { + glog.Errorf("failed to parse file id %s", fileId) + return + } + c.diskCaches[0].WriteNeedle(fid.Key, data) + } + +func (c *ChunkCache) Shutdown() { + c.Lock() + defer c.Unlock() + for _, diskCache := range c.diskCaches { + diskCache.Shutdown() + } +} \ No newline at end of file diff --git a/weed/util/chunk_cache/chunk_cache_in_memory.go b/weed/util/chunk_cache/chunk_cache_in_memory.go new file mode 100644 index 000000000..931e45e9a --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache_in_memory.go @@ -0,0 +1,36 @@ +package chunk_cache + +import ( + "time" + + "github.com/karlseguin/ccache" +) + +// a global cache for recently accessed file chunks +type ChunkCacheInMemory struct { + cache *ccache.Cache +} + +func NewChunkCacheInMemory(maxEntries int64) *ChunkCacheInMemory { + pruneCount := maxEntries >> 3 + if pruneCount <= 0 { + pruneCount = 500 + } + return &ChunkCacheInMemory{ + cache: ccache.New(ccache.Configure().MaxSize(maxEntries).ItemsToPrune(uint32(pruneCount))), + } +} + +func (c *ChunkCacheInMemory) GetChunk(fileId string) []byte { + item := c.cache.Get(fileId) + if item == nil { + return nil + } + data := item.Value().([]byte) + item.Extend(time.Hour) + return data +} + +func (c *ChunkCacheInMemory) SetChunk(fileId string, data []byte) { + c.cache.Set(fileId, data, time.Hour) +} diff --git a/weed/util/chunk_cache/chunk_cache_on_disk.go b/weed/util/chunk_cache/chunk_cache_on_disk.go new file mode 100644 index 000000000..2c7ef8d39 --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache_on_disk.go @@ -0,0 +1,145 @@ +package chunk_cache + +import ( + "fmt" + "os" + "time" + + "github.com/syndtr/goleveldb/leveldb/opt" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" +) + +// This implements an on disk cache +// The entries are an FIFO with a size limit + +type ChunkCacheVolume struct { + DataBackend backend.BackendStorageFile + nm storage.NeedleMapper + fileName string + smallBuffer []byte + sizeLimit int64 + lastModTime time.Time + fileSize int64 +} + +func LoadOrCreateChunkCacheVolume(fileName string, preallocate int64) (*ChunkCacheVolume, error) { + + v := &ChunkCacheVolume{ + smallBuffer: make([]byte, types.NeedlePaddingSize), + fileName: fileName, + sizeLimit: preallocate, + } + + var err error + + if exists, canRead, canWrite, modTime, fileSize := util.CheckFile(v.fileName + ".dat"); exists { + if !canRead { + return nil, fmt.Errorf("cannot read cache file %s.dat", v.fileName) + } + if !canWrite { + return nil, fmt.Errorf("cannot write cache file %s.dat", v.fileName) + } + if dataFile, err := os.OpenFile(v.fileName+".dat", os.O_RDWR|os.O_CREATE, 0644); err != nil { + return nil, fmt.Errorf("cannot create cache file %s.dat: %v", v.fileName, err) + } else { + v.DataBackend = backend.NewDiskFile(dataFile) + v.lastModTime = modTime + v.fileSize = fileSize + } + } else { + if v.DataBackend, err = backend.CreateVolumeFile(v.fileName+".dat", preallocate, 0); err != nil { + return nil, fmt.Errorf("cannot create cache file %s.dat: %v", v.fileName, err) + } + v.lastModTime = time.Now() + } + + var indexFile *os.File + if indexFile, err = os.OpenFile(v.fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); err != nil { + return nil, fmt.Errorf("cannot write cache index %s.idx: %v", v.fileName, err) + } + + glog.V(0).Infoln("loading leveldb", v.fileName+".ldb") + opts := &opt.Options{ + BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB + CompactionTableSizeMultiplier: 10, // default value is 1 + } + if v.nm, err = storage.NewLevelDbNeedleMap(v.fileName+".ldb", indexFile, opts); err != nil { + return nil, fmt.Errorf("loading leveldb %s error: %v", v.fileName+".ldb", err) + } + + return v, nil + +} + +func (v *ChunkCacheVolume) Shutdown() { + if v.DataBackend != nil { + v.DataBackend.Close() + v.DataBackend = nil + } + if v.nm != nil { + v.nm.Close() + v.nm = nil + } +} + +func (v *ChunkCacheVolume) destroy() { + v.Shutdown() + os.Remove(v.fileName + ".dat") + os.Remove(v.fileName + ".idx") + os.RemoveAll(v.fileName + ".ldb") +} + +func (v *ChunkCacheVolume) Reset() (*ChunkCacheVolume, error) { + v.destroy() + return LoadOrCreateChunkCacheVolume(v.fileName, v.sizeLimit) +} + +func (v *ChunkCacheVolume) GetNeedle(key types.NeedleId) ([]byte, error) { + + nv, ok := v.nm.Get(key) + if !ok { + return nil, storage.ErrorNotFound + } + data := make([]byte, nv.Size) + if readSize, readErr := v.DataBackend.ReadAt(data, nv.Offset.ToAcutalOffset()); readErr != nil { + return nil, fmt.Errorf("read %s.dat [%d,%d): %v", + v.fileName, nv.Offset.ToAcutalOffset(), nv.Offset.ToAcutalOffset()+int64(nv.Size), readErr) + } else { + if readSize != int(nv.Size) { + return nil, fmt.Errorf("read %d, expected %d", readSize, nv.Size) + } + } + + return data, nil +} + +func (v *ChunkCacheVolume) WriteNeedle(key types.NeedleId, data []byte) error { + + offset := v.fileSize + + written, err := v.DataBackend.WriteAt(data, offset) + if err != nil { + return err + } else if written != len(data) { + return fmt.Errorf("partial written %d, expected %d", written, len(data)) + } + + v.fileSize += int64(written) + extraSize := written % types.NeedlePaddingSize + if extraSize != 0 { + v.DataBackend.WriteAt(v.smallBuffer[:types.NeedlePaddingSize-extraSize], offset+int64(written)) + v.fileSize += int64(types.NeedlePaddingSize - extraSize) + } + + if err := v.nm.Put(key, types.ToOffset(offset), uint32(len(data))); err != nil { + glog.V(4).Infof("failed to save in needle map %d: %v", key, err) + } + + return nil +} diff --git a/weed/util/chunk_cache/chunk_cache_on_disk_test.go b/weed/util/chunk_cache/chunk_cache_on_disk_test.go new file mode 100644 index 000000000..256b10139 --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache_on_disk_test.go @@ -0,0 +1,58 @@ +package chunk_cache + +import ( + "bytes" + "fmt" + "io/ioutil" + "math/rand" + "os" + "testing" +) + +func TestOnDisk(t *testing.T) { + + tmpDir, _ := ioutil.TempDir("", "c") + defer os.RemoveAll(tmpDir) + + totalDiskSizeMb := int64(6) + segmentCount := 2 + + cache := NewChunkCache(0, tmpDir, totalDiskSizeMb, segmentCount) + + writeCount := 5 + type test_data struct { + data []byte + fileId string + } + testData := make([]*test_data, writeCount) + for i:=0;i Date: Sun, 12 Apr 2020 01:00:12 -0700 Subject: handle nil chunk cache --- weed/util/chunk_cache/chunk_cache.go | 10 ++++++++++ 1 file changed, 10 insertions(+) (limited to 'weed/util/chunk_cache') diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go index 682f5185a..ead7a8d0b 100644 --- a/weed/util/chunk_cache/chunk_cache.go +++ b/weed/util/chunk_cache/chunk_cache.go @@ -47,6 +47,10 @@ func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64, segmentCount } func (c *ChunkCache) GetChunk(fileId string) (data []byte) { + if c == nil { + return + } + c.RLock() defer c.RUnlock() @@ -76,6 +80,9 @@ func (c *ChunkCache) GetChunk(fileId string) (data []byte) { } func (c *ChunkCache) SetChunk(fileId string, data []byte) { + if c == nil { + return + } c.Lock() defer c.Unlock() @@ -107,6 +114,9 @@ func (c *ChunkCache) SetChunk(fileId string, data []byte) { } func (c *ChunkCache) Shutdown() { + if c == nil { + return + } c.Lock() defer c.Unlock() for _, diskCache := range c.diskCaches { -- cgit v1.2.3 From 2a1f396df5abd47e7fc4a58c3bc39675e1e84e4f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 12 Apr 2020 01:06:50 -0700 Subject: avoid duplicated setting chunks into cache --- weed/util/chunk_cache/chunk_cache.go | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'weed/util/chunk_cache') diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go index ead7a8d0b..48e4bfb0d 100644 --- a/weed/util/chunk_cache/chunk_cache.go +++ b/weed/util/chunk_cache/chunk_cache.go @@ -54,6 +54,10 @@ func (c *ChunkCache) GetChunk(fileId string) (data []byte) { c.RLock() defer c.RUnlock() + return c.doGetChunk(fileId) +} + +func (c *ChunkCache) doGetChunk(fileId string) (data []byte) { if data = c.memCache.GetChunk(fileId); data != nil { return data } @@ -86,6 +90,13 @@ func (c *ChunkCache) SetChunk(fileId string, data []byte) { c.Lock() defer c.Unlock() + if existingData := c.doGetChunk(fileId); len(existingData)==0{ + c.doSetChunk(fileId, data) + } +} + +func (c *ChunkCache) doSetChunk(fileId string, data []byte) { + c.memCache.SetChunk(fileId, data) if len(c.diskCaches) == 0 { -- cgit v1.2.3 From 94e35cdb3552498b25824950bde94334c8b25331 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 12 Apr 2020 03:34:36 -0700 Subject: mount: fix fix bug found by git bisect, but I do not understand why it can cause error! --- weed/util/chunk_cache/chunk_cache.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'weed/util/chunk_cache') diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go index 48e4bfb0d..9f2a0518f 100644 --- a/weed/util/chunk_cache/chunk_cache.go +++ b/weed/util/chunk_cache/chunk_cache.go @@ -90,9 +90,7 @@ func (c *ChunkCache) SetChunk(fileId string, data []byte) { c.Lock() defer c.Unlock() - if existingData := c.doGetChunk(fileId); len(existingData)==0{ - c.doSetChunk(fileId, data) - } + c.doSetChunk(fileId, data) } func (c *ChunkCache) doSetChunk(fileId string, data []byte) { -- cgit v1.2.3 From 7764e0465ce976bb528c27bb9aa25857102570ef Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 12 Apr 2020 21:00:55 -0700 Subject: refactoring --- weed/util/chunk_cache/chunk_cache.go | 2 +- weed/util/chunk_cache/chunk_cache_on_disk_test.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) (limited to 'weed/util/chunk_cache') diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go index 9f2a0518f..7c4a77304 100644 --- a/weed/util/chunk_cache/chunk_cache.go +++ b/weed/util/chunk_cache/chunk_cache.go @@ -131,4 +131,4 @@ func (c *ChunkCache) Shutdown() { for _, diskCache := range c.diskCaches { diskCache.Shutdown() } -} \ No newline at end of file +} diff --git a/weed/util/chunk_cache/chunk_cache_on_disk_test.go b/weed/util/chunk_cache/chunk_cache_on_disk_test.go index 256b10139..f93daf5a7 100644 --- a/weed/util/chunk_cache/chunk_cache_on_disk_test.go +++ b/weed/util/chunk_cache/chunk_cache_on_disk_test.go @@ -21,11 +21,11 @@ func TestOnDisk(t *testing.T) { writeCount := 5 type test_data struct { - data []byte + data []byte fileId string } testData := make([]*test_data, writeCount) - for i:=0;i Date: Mon, 13 Apr 2020 21:58:10 -0700 Subject: refactoring --- weed/util/chunk_cache/chunk_cache.go | 86 +++++++---------------- weed/util/chunk_cache/chunk_cache_on_disk_test.go | 6 +- weed/util/chunk_cache/on_disk_cache_layer.go | 83 ++++++++++++++++++++++ 3 files changed, 112 insertions(+), 63 deletions(-) create mode 100644 weed/util/chunk_cache/on_disk_cache_layer.go (limited to 'weed/util/chunk_cache') diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go index 7c4a77304..232e57a55 100644 --- a/weed/util/chunk_cache/chunk_cache.go +++ b/weed/util/chunk_cache/chunk_cache.go @@ -1,52 +1,39 @@ package chunk_cache import ( - "fmt" - "path" - "sort" "sync" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" ) +const ( + memCacheSizeLimit = 1024 * 1024 +) + // a global cache for recently accessed file chunks type ChunkCache struct { - memCache *ChunkCacheInMemory - diskCaches []*ChunkCacheVolume + memCache *ChunkCacheInMemory + diskCache *OnDiskCacheLayer sync.RWMutex } func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64, segmentCount int) *ChunkCache { - c := &ChunkCache{ - memCache: NewChunkCacheInMemory(maxEntries), - } volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000) if volumeCount < segmentCount { volumeCount, volumeSize = segmentCount, diskSizeMB/int64(segmentCount) } - for i := 0; i < volumeCount; i++ { - fileName := path.Join(dir, fmt.Sprintf("cache_%d", i)) - diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize*1024*1024) - if err != nil { - glog.Errorf("failed to add cache %s : %v", fileName, err) - } else { - c.diskCaches = append(c.diskCaches, diskCache) - } + c := &ChunkCache{ + memCache: NewChunkCacheInMemory(maxEntries), + diskCache: NewOnDiskCacheLayer(dir, "cache", volumeCount, volumeSize), } - // keep newest cache to the front - sort.Slice(c.diskCaches, func(i, j int) bool { - return c.diskCaches[i].lastModTime.After(c.diskCaches[j].lastModTime) - }) - return c } -func (c *ChunkCache) GetChunk(fileId string) (data []byte) { +func (c *ChunkCache) GetChunk(fileId string, chunkSize uint64) (data []byte) { if c == nil { return } @@ -54,12 +41,15 @@ func (c *ChunkCache) GetChunk(fileId string) (data []byte) { c.RLock() defer c.RUnlock() - return c.doGetChunk(fileId) + return c.doGetChunk(fileId, chunkSize) } -func (c *ChunkCache) doGetChunk(fileId string) (data []byte) { - if data = c.memCache.GetChunk(fileId); data != nil { - return data +func (c *ChunkCache) doGetChunk(fileId string, chunkSize uint64) (data []byte) { + + if chunkSize < memCacheSizeLimit { + if data = c.memCache.GetChunk(fileId); data != nil { + return data + } } fid, err := needle.ParseFileIdFromString(fileId) @@ -67,20 +57,9 @@ func (c *ChunkCache) doGetChunk(fileId string) (data []byte) { glog.Errorf("failed to parse file id %s", fileId) return nil } - for _, diskCache := range c.diskCaches { - data, err = diskCache.GetNeedle(fid.Key) - if err == storage.ErrorNotFound { - continue - } - if err != nil { - glog.Errorf("failed to read cache file %s id %s", diskCache.fileName, fileId) - continue - } - if len(data) != 0 { - return - } - } - return nil + + return c.diskCache.getChunk(fid.Key) + } func (c *ChunkCache) SetChunk(fileId string, data []byte) { @@ -95,22 +74,8 @@ func (c *ChunkCache) SetChunk(fileId string, data []byte) { func (c *ChunkCache) doSetChunk(fileId string, data []byte) { - c.memCache.SetChunk(fileId, data) - - if len(c.diskCaches) == 0 { - return - } - - if c.diskCaches[0].fileSize+int64(len(data)) > c.diskCaches[0].sizeLimit { - t, resetErr := c.diskCaches[len(c.diskCaches)-1].Reset() - if resetErr != nil { - glog.Errorf("failed to reset cache file %s", c.diskCaches[len(c.diskCaches)-1].fileName) - return - } - for i := len(c.diskCaches) - 1; i > 0; i-- { - c.diskCaches[i] = c.diskCaches[i-1] - } - c.diskCaches[0] = t + if len(data) < memCacheSizeLimit { + c.memCache.SetChunk(fileId, data) } fid, err := needle.ParseFileIdFromString(fileId) @@ -118,7 +83,8 @@ func (c *ChunkCache) doSetChunk(fileId string, data []byte) { glog.Errorf("failed to parse file id %s", fileId) return } - c.diskCaches[0].WriteNeedle(fid.Key, data) + + c.diskCache.setChunk(fid.Key, data) } @@ -128,7 +94,5 @@ func (c *ChunkCache) Shutdown() { } c.Lock() defer c.Unlock() - for _, diskCache := range c.diskCaches { - diskCache.Shutdown() - } + c.diskCache.shutdown() } diff --git a/weed/util/chunk_cache/chunk_cache_on_disk_test.go b/weed/util/chunk_cache/chunk_cache_on_disk_test.go index f93daf5a7..63bcba2be 100644 --- a/weed/util/chunk_cache/chunk_cache_on_disk_test.go +++ b/weed/util/chunk_cache/chunk_cache_on_disk_test.go @@ -23,6 +23,7 @@ func TestOnDisk(t *testing.T) { type test_data struct { data []byte fileId string + size uint64 } testData := make([]*test_data, writeCount) for i := 0; i < writeCount; i++ { @@ -31,12 +32,13 @@ func TestOnDisk(t *testing.T) { testData[i] = &test_data{ data: buff, fileId: fmt.Sprintf("1,%daabbccdd", i+1), + size: uint64(len(buff)), } cache.SetChunk(testData[i].fileId, testData[i].data) } for i := 0; i < writeCount; i++ { - data := cache.GetChunk(testData[i].fileId) + data := cache.GetChunk(testData[i].fileId, testData[i].size) if bytes.Compare(data, testData[i].data) != 0 { t.Errorf("failed to write to and read from cache: %d", i) } @@ -47,7 +49,7 @@ func TestOnDisk(t *testing.T) { cache = NewChunkCache(0, tmpDir, totalDiskSizeMb, segmentCount) for i := 0; i < writeCount; i++ { - data := cache.GetChunk(testData[i].fileId) + data := cache.GetChunk(testData[i].fileId, testData[i].size) if bytes.Compare(data, testData[i].data) != 0 { t.Errorf("failed to write to and read from cache: %d", i) } diff --git a/weed/util/chunk_cache/on_disk_cache_layer.go b/weed/util/chunk_cache/on_disk_cache_layer.go new file mode 100644 index 000000000..065188ac3 --- /dev/null +++ b/weed/util/chunk_cache/on_disk_cache_layer.go @@ -0,0 +1,83 @@ +package chunk_cache + +import ( + "fmt" + "path" + "sort" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +type OnDiskCacheLayer struct { + diskCaches []*ChunkCacheVolume +} + +func NewOnDiskCacheLayer(dir, namePrefix string, volumeCount int, volumeSize int64) *OnDiskCacheLayer{ + c := &OnDiskCacheLayer{} + for i := 0; i < volumeCount; i++ { + fileName := path.Join(dir, fmt.Sprintf("%s_%d", namePrefix, i)) + diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize*1024*1024) + if err != nil { + glog.Errorf("failed to add cache %s : %v", fileName, err) + } else { + c.diskCaches = append(c.diskCaches, diskCache) + } + } + + // keep newest cache to the front + sort.Slice(c.diskCaches, func(i, j int) bool { + return c.diskCaches[i].lastModTime.After(c.diskCaches[j].lastModTime) + }) + + return c +} + +func (c *OnDiskCacheLayer) setChunk(needleId types.NeedleId, data []byte) { + + if c.diskCaches[0].fileSize+int64(len(data)) > c.diskCaches[0].sizeLimit { + t, resetErr := c.diskCaches[len(c.diskCaches)-1].Reset() + if resetErr != nil { + glog.Errorf("failed to reset cache file %s", c.diskCaches[len(c.diskCaches)-1].fileName) + return + } + for i := len(c.diskCaches) - 1; i > 0; i-- { + c.diskCaches[i] = c.diskCaches[i-1] + } + c.diskCaches[0] = t + } + + c.diskCaches[0].WriteNeedle(needleId, data) + +} + +func (c *OnDiskCacheLayer) getChunk(needleId types.NeedleId) (data []byte){ + + var err error + + for _, diskCache := range c.diskCaches { + data, err = diskCache.GetNeedle(needleId) + if err == storage.ErrorNotFound { + continue + } + if err != nil { + glog.Errorf("failed to read cache file %s id %d", diskCache.fileName, needleId) + continue + } + if len(data) != 0 { + return + } + } + + return nil + +} + +func (c *OnDiskCacheLayer) shutdown(){ + + for _, diskCache := range c.diskCaches { + diskCache.Shutdown() + } + +} -- cgit v1.2.3 From 2b5c4fbbf37e25adfa19b081c4adf5458b05b66c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 13 Apr 2020 22:19:27 -0700 Subject: tiered caching 1/4 for small less than 1MB files. 1/4 for 1~4MB files, 1/2 for bigger than 4MB files --- weed/util/chunk_cache/chunk_cache.go | 43 +++++++++++++++-------- weed/util/chunk_cache/chunk_cache_on_disk_test.go | 7 ++-- weed/util/chunk_cache/on_disk_cache_layer.go | 8 ++++- 3 files changed, 39 insertions(+), 19 deletions(-) (limited to 'weed/util/chunk_cache') diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go index 232e57a55..e1d4b639f 100644 --- a/weed/util/chunk_cache/chunk_cache.go +++ b/weed/util/chunk_cache/chunk_cache.go @@ -8,27 +8,27 @@ import ( ) const ( - memCacheSizeLimit = 1024 * 1024 + memCacheSizeLimit = 1024 * 1024 + onDiskCacheSizeLimit0 = memCacheSizeLimit + onDiskCacheSizeLimit1 = 4 * memCacheSizeLimit ) // a global cache for recently accessed file chunks type ChunkCache struct { - memCache *ChunkCacheInMemory - diskCache *OnDiskCacheLayer + memCache *ChunkCacheInMemory + diskCaches []*OnDiskCacheLayer sync.RWMutex } -func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64, segmentCount int) *ChunkCache { - - volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000) - if volumeCount < segmentCount { - volumeCount, volumeSize = segmentCount, diskSizeMB/int64(segmentCount) - } +func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64) *ChunkCache { c := &ChunkCache{ - memCache: NewChunkCacheInMemory(maxEntries), - diskCache: NewOnDiskCacheLayer(dir, "cache", volumeCount, volumeSize), + memCache: NewChunkCacheInMemory(maxEntries), } + c.diskCaches = make([]*OnDiskCacheLayer, 3) + c.diskCaches[0] = NewOnDiskCacheLayer(dir, "c0_1", diskSizeMB/4, 4) + c.diskCaches[1] = NewOnDiskCacheLayer(dir, "c1_4", diskSizeMB/4, 4) + c.diskCaches[2] = NewOnDiskCacheLayer(dir, "cache", diskSizeMB/2, 4) return c } @@ -58,7 +58,14 @@ func (c *ChunkCache) doGetChunk(fileId string, chunkSize uint64) (data []byte) { return nil } - return c.diskCache.getChunk(fid.Key) + for _, diskCache := range c.diskCaches { + data := diskCache.getChunk(fid.Key) + if len(data) != 0 { + return data + } + } + + return nil } @@ -84,7 +91,13 @@ func (c *ChunkCache) doSetChunk(fileId string, data []byte) { return } - c.diskCache.setChunk(fid.Key, data) + if len(data) < onDiskCacheSizeLimit0 { + c.diskCaches[0].setChunk(fid.Key, data) + } else if len(data) < onDiskCacheSizeLimit1 { + c.diskCaches[1].setChunk(fid.Key, data) + } else { + c.diskCaches[2].setChunk(fid.Key, data) + } } @@ -94,5 +107,7 @@ func (c *ChunkCache) Shutdown() { } c.Lock() defer c.Unlock() - c.diskCache.shutdown() + for _, diskCache := range c.diskCaches { + diskCache.shutdown() + } } diff --git a/weed/util/chunk_cache/chunk_cache_on_disk_test.go b/weed/util/chunk_cache/chunk_cache_on_disk_test.go index 63bcba2be..f061f2ba2 100644 --- a/weed/util/chunk_cache/chunk_cache_on_disk_test.go +++ b/weed/util/chunk_cache/chunk_cache_on_disk_test.go @@ -14,10 +14,9 @@ func TestOnDisk(t *testing.T) { tmpDir, _ := ioutil.TempDir("", "c") defer os.RemoveAll(tmpDir) - totalDiskSizeMb := int64(6) - segmentCount := 2 + totalDiskSizeMb := int64(32) - cache := NewChunkCache(0, tmpDir, totalDiskSizeMb, segmentCount) + cache := NewChunkCache(0, tmpDir, totalDiskSizeMb) writeCount := 5 type test_data struct { @@ -46,7 +45,7 @@ func TestOnDisk(t *testing.T) { cache.Shutdown() - cache = NewChunkCache(0, tmpDir, totalDiskSizeMb, segmentCount) + cache = NewChunkCache(0, tmpDir, totalDiskSizeMb) for i := 0; i < writeCount; i++ { data := cache.GetChunk(testData[i].fileId, testData[i].size) diff --git a/weed/util/chunk_cache/on_disk_cache_layer.go b/weed/util/chunk_cache/on_disk_cache_layer.go index 065188ac3..9bd9c2b44 100644 --- a/weed/util/chunk_cache/on_disk_cache_layer.go +++ b/weed/util/chunk_cache/on_disk_cache_layer.go @@ -14,7 +14,13 @@ type OnDiskCacheLayer struct { diskCaches []*ChunkCacheVolume } -func NewOnDiskCacheLayer(dir, namePrefix string, volumeCount int, volumeSize int64) *OnDiskCacheLayer{ +func NewOnDiskCacheLayer(dir, namePrefix string, diskSizeMB int64, segmentCount int) *OnDiskCacheLayer{ + + volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000) + if volumeCount < segmentCount { + volumeCount, volumeSize = segmentCount, diskSizeMB/int64(segmentCount) + } + c := &OnDiskCacheLayer{} for i := 0; i < volumeCount; i++ { fileName := path.Join(dir, fmt.Sprintf("%s_%d", namePrefix, i)) -- cgit v1.2.3 From cb3985be70c2b6eb9a0e00a04a6a02f8ebd650d5 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 20 Apr 2020 17:48:06 -0700 Subject: go fmt --- weed/util/chunk_cache/on_disk_cache_layer.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'weed/util/chunk_cache') diff --git a/weed/util/chunk_cache/on_disk_cache_layer.go b/weed/util/chunk_cache/on_disk_cache_layer.go index 9bd9c2b44..9cf8e3ab2 100644 --- a/weed/util/chunk_cache/on_disk_cache_layer.go +++ b/weed/util/chunk_cache/on_disk_cache_layer.go @@ -14,7 +14,7 @@ type OnDiskCacheLayer struct { diskCaches []*ChunkCacheVolume } -func NewOnDiskCacheLayer(dir, namePrefix string, diskSizeMB int64, segmentCount int) *OnDiskCacheLayer{ +func NewOnDiskCacheLayer(dir, namePrefix string, diskSizeMB int64, segmentCount int) *OnDiskCacheLayer { volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000) if volumeCount < segmentCount { @@ -58,7 +58,7 @@ func (c *OnDiskCacheLayer) setChunk(needleId types.NeedleId, data []byte) { } -func (c *OnDiskCacheLayer) getChunk(needleId types.NeedleId) (data []byte){ +func (c *OnDiskCacheLayer) getChunk(needleId types.NeedleId) (data []byte) { var err error @@ -80,7 +80,7 @@ func (c *OnDiskCacheLayer) getChunk(needleId types.NeedleId) (data []byte){ } -func (c *OnDiskCacheLayer) shutdown(){ +func (c *OnDiskCacheLayer) shutdown() { for _, diskCache := range c.diskCaches { diskCache.Shutdown() -- cgit v1.2.3