From d7f3acb2c056534f29950f3586d804ec274349b2 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 11 Apr 2020 12:45:24 -0700 Subject: refactor --- weed/util/chunk_cache/chunk_cache.go | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 weed/util/chunk_cache/chunk_cache.go (limited to 'weed/util/chunk_cache/chunk_cache.go') diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go new file mode 100644 index 000000000..e2676d9cc --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache.go @@ -0,0 +1,36 @@ +package chunk_cache + +import ( + "time" + + "github.com/karlseguin/ccache" +) + +// a global cache for recently accessed file chunks +type ChunkCache struct { + cache *ccache.Cache +} + +func NewChunkCache(maxEntries int64) *ChunkCache { + pruneCount := maxEntries >> 3 + if pruneCount <= 0 { + pruneCount = 500 + } + return &ChunkCache{ + cache: ccache.New(ccache.Configure().MaxSize(maxEntries).ItemsToPrune(uint32(pruneCount))), + } +} + +func (c *ChunkCache) GetChunk(fileId string) []byte { + item := c.cache.Get(fileId) + if item == nil { + return nil + } + data := item.Value().([]byte) + item.Extend(time.Hour) + return data +} + +func (c *ChunkCache) SetChunk(fileId string, data []byte) { + c.cache.Set(fileId, data, time.Hour) +} -- cgit v1.2.3 From df97da25f902912dd527d4aed567408c3ca0f9ae Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 11 Apr 2020 21:12:41 -0700 Subject: mount: add on disk caching --- weed/util/chunk_cache/chunk_cache.go | 111 ++++++++++++++++++++++++++++++----- 1 file changed, 95 insertions(+), 16 deletions(-) (limited to 'weed/util/chunk_cache/chunk_cache.go') diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go index e2676d9cc..682f5185a 100644 --- a/weed/util/chunk_cache/chunk_cache.go +++ b/weed/util/chunk_cache/chunk_cache.go @@ -1,36 +1,115 @@ package chunk_cache import ( - "time" + "fmt" + "path" + "sort" + "sync" - "github.com/karlseguin/ccache" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) // a global cache for recently accessed file chunks type ChunkCache struct { - cache *ccache.Cache + memCache *ChunkCacheInMemory + diskCaches []*ChunkCacheVolume + sync.RWMutex } -func NewChunkCache(maxEntries int64) *ChunkCache { - pruneCount := maxEntries >> 3 - if pruneCount <= 0 { - pruneCount = 500 +func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64, segmentCount int) *ChunkCache { + c := &ChunkCache{ + memCache: NewChunkCacheInMemory(maxEntries), } - return &ChunkCache{ - cache: ccache.New(ccache.Configure().MaxSize(maxEntries).ItemsToPrune(uint32(pruneCount))), + + volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000) + if volumeCount < segmentCount { + volumeCount, volumeSize = segmentCount, diskSizeMB/int64(segmentCount) + } + + for i := 0; i < volumeCount; i++ { + fileName := path.Join(dir, fmt.Sprintf("cache_%d", i)) + diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize*1024*1024) + if err != nil { + glog.Errorf("failed to add cache %s : %v", fileName, err) + } else { + c.diskCaches = append(c.diskCaches, diskCache) + } } + + // keep newest cache to the front + sort.Slice(c.diskCaches, func(i, j int) bool { + return c.diskCaches[i].lastModTime.After(c.diskCaches[j].lastModTime) + }) + + return c } -func (c *ChunkCache) GetChunk(fileId string) []byte { - item := c.cache.Get(fileId) - if item == nil { +func (c *ChunkCache) GetChunk(fileId string) (data []byte) { + c.RLock() + defer c.RUnlock() + + if data = c.memCache.GetChunk(fileId); data != nil { + return data + } + + fid, err := needle.ParseFileIdFromString(fileId) + if err != nil { + glog.Errorf("failed to parse file id %s", fileId) return nil } - data := item.Value().([]byte) - item.Extend(time.Hour) - return data + for _, diskCache := range c.diskCaches { + data, err = diskCache.GetNeedle(fid.Key) + if err == storage.ErrorNotFound { + continue + } + if err != nil { + glog.Errorf("failed to read cache file %s id %s", diskCache.fileName, fileId) + continue + } + if len(data) != 0 { + return + } + } + return nil } func (c *ChunkCache) SetChunk(fileId string, data []byte) { - c.cache.Set(fileId, data, time.Hour) + c.Lock() + defer c.Unlock() + + c.memCache.SetChunk(fileId, data) + + if len(c.diskCaches) == 0 { + return + } + + if c.diskCaches[0].fileSize+int64(len(data)) > c.diskCaches[0].sizeLimit { + t, resetErr := c.diskCaches[len(c.diskCaches)-1].Reset() + if resetErr != nil { + glog.Errorf("failed to reset cache file %s", c.diskCaches[len(c.diskCaches)-1].fileName) + return + } + for i := len(c.diskCaches) - 1; i > 0; i-- { + c.diskCaches[i] = c.diskCaches[i-1] + } + c.diskCaches[0] = t + } + + fid, err := needle.ParseFileIdFromString(fileId) + if err != nil { + glog.Errorf("failed to parse file id %s", fileId) + return + } + c.diskCaches[0].WriteNeedle(fid.Key, data) + } + +func (c *ChunkCache) Shutdown() { + c.Lock() + defer c.Unlock() + for _, diskCache := range c.diskCaches { + diskCache.Shutdown() + } +} \ No newline at end of file -- cgit v1.2.3 From b9b7da905ef9ef51d3e060ab1612becd63ab272d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 12 Apr 2020 01:00:12 -0700 Subject: handle nil chunk cache --- weed/util/chunk_cache/chunk_cache.go | 10 ++++++++++ 1 file changed, 10 insertions(+) (limited to 'weed/util/chunk_cache/chunk_cache.go') diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go index 682f5185a..ead7a8d0b 100644 --- a/weed/util/chunk_cache/chunk_cache.go +++ b/weed/util/chunk_cache/chunk_cache.go @@ -47,6 +47,10 @@ func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64, segmentCount } func (c *ChunkCache) GetChunk(fileId string) (data []byte) { + if c == nil { + return + } + c.RLock() defer c.RUnlock() @@ -76,6 +80,9 @@ func (c *ChunkCache) GetChunk(fileId string) (data []byte) { } func (c *ChunkCache) SetChunk(fileId string, data []byte) { + if c == nil { + return + } c.Lock() defer c.Unlock() @@ -107,6 +114,9 @@ func (c *ChunkCache) SetChunk(fileId string, data []byte) { } func (c *ChunkCache) Shutdown() { + if c == nil { + return + } c.Lock() defer c.Unlock() for _, diskCache := range c.diskCaches { -- cgit v1.2.3 From 2a1f396df5abd47e7fc4a58c3bc39675e1e84e4f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 12 Apr 2020 01:06:50 -0700 Subject: avoid duplicated setting chunks into cache --- weed/util/chunk_cache/chunk_cache.go | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'weed/util/chunk_cache/chunk_cache.go') diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go index ead7a8d0b..48e4bfb0d 100644 --- a/weed/util/chunk_cache/chunk_cache.go +++ b/weed/util/chunk_cache/chunk_cache.go @@ -54,6 +54,10 @@ func (c *ChunkCache) GetChunk(fileId string) (data []byte) { c.RLock() defer c.RUnlock() + return c.doGetChunk(fileId) +} + +func (c *ChunkCache) doGetChunk(fileId string) (data []byte) { if data = c.memCache.GetChunk(fileId); data != nil { return data } @@ -86,6 +90,13 @@ func (c *ChunkCache) SetChunk(fileId string, data []byte) { c.Lock() defer c.Unlock() + if existingData := c.doGetChunk(fileId); len(existingData)==0{ + c.doSetChunk(fileId, data) + } +} + +func (c *ChunkCache) doSetChunk(fileId string, data []byte) { + c.memCache.SetChunk(fileId, data) if len(c.diskCaches) == 0 { -- cgit v1.2.3 From 94e35cdb3552498b25824950bde94334c8b25331 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 12 Apr 2020 03:34:36 -0700 Subject: mount: fix fix bug found by git bisect, but I do not understand why it can cause error! --- weed/util/chunk_cache/chunk_cache.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'weed/util/chunk_cache/chunk_cache.go') diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go index 48e4bfb0d..9f2a0518f 100644 --- a/weed/util/chunk_cache/chunk_cache.go +++ b/weed/util/chunk_cache/chunk_cache.go @@ -90,9 +90,7 @@ func (c *ChunkCache) SetChunk(fileId string, data []byte) { c.Lock() defer c.Unlock() - if existingData := c.doGetChunk(fileId); len(existingData)==0{ - c.doSetChunk(fileId, data) - } + c.doSetChunk(fileId, data) } func (c *ChunkCache) doSetChunk(fileId string, data []byte) { -- cgit v1.2.3 From 7764e0465ce976bb528c27bb9aa25857102570ef Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 12 Apr 2020 21:00:55 -0700 Subject: refactoring --- weed/util/chunk_cache/chunk_cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'weed/util/chunk_cache/chunk_cache.go') diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go index 9f2a0518f..7c4a77304 100644 --- a/weed/util/chunk_cache/chunk_cache.go +++ b/weed/util/chunk_cache/chunk_cache.go @@ -131,4 +131,4 @@ func (c *ChunkCache) Shutdown() { for _, diskCache := range c.diskCaches { diskCache.Shutdown() } -} \ No newline at end of file +} -- cgit v1.2.3 From f282ed444baf6676c22df1b7c35964dd73d2c04a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 13 Apr 2020 21:58:10 -0700 Subject: refactoring --- weed/util/chunk_cache/chunk_cache.go | 86 +++++++++++------------------------- 1 file changed, 25 insertions(+), 61 deletions(-) (limited to 'weed/util/chunk_cache/chunk_cache.go') diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go index 7c4a77304..232e57a55 100644 --- a/weed/util/chunk_cache/chunk_cache.go +++ b/weed/util/chunk_cache/chunk_cache.go @@ -1,52 +1,39 @@ package chunk_cache import ( - "fmt" - "path" - "sort" "sync" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" ) +const ( + memCacheSizeLimit = 1024 * 1024 +) + // a global cache for recently accessed file chunks type ChunkCache struct { - memCache *ChunkCacheInMemory - diskCaches []*ChunkCacheVolume + memCache *ChunkCacheInMemory + diskCache *OnDiskCacheLayer sync.RWMutex } func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64, segmentCount int) *ChunkCache { - c := &ChunkCache{ - memCache: NewChunkCacheInMemory(maxEntries), - } volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000) if volumeCount < segmentCount { volumeCount, volumeSize = segmentCount, diskSizeMB/int64(segmentCount) } - for i := 0; i < volumeCount; i++ { - fileName := path.Join(dir, fmt.Sprintf("cache_%d", i)) - diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize*1024*1024) - if err != nil { - glog.Errorf("failed to add cache %s : %v", fileName, err) - } else { - c.diskCaches = append(c.diskCaches, diskCache) - } + c := &ChunkCache{ + memCache: NewChunkCacheInMemory(maxEntries), + diskCache: NewOnDiskCacheLayer(dir, "cache", volumeCount, volumeSize), } - // keep newest cache to the front - sort.Slice(c.diskCaches, func(i, j int) bool { - return c.diskCaches[i].lastModTime.After(c.diskCaches[j].lastModTime) - }) - return c } -func (c *ChunkCache) GetChunk(fileId string) (data []byte) { +func (c *ChunkCache) GetChunk(fileId string, chunkSize uint64) (data []byte) { if c == nil { return } @@ -54,12 +41,15 @@ func (c *ChunkCache) GetChunk(fileId string) (data []byte) { c.RLock() defer c.RUnlock() - return c.doGetChunk(fileId) + return c.doGetChunk(fileId, chunkSize) } -func (c *ChunkCache) doGetChunk(fileId string) (data []byte) { - if data = c.memCache.GetChunk(fileId); data != nil { - return data +func (c *ChunkCache) doGetChunk(fileId string, chunkSize uint64) (data []byte) { + + if chunkSize < memCacheSizeLimit { + if data = c.memCache.GetChunk(fileId); data != nil { + return data + } } fid, err := needle.ParseFileIdFromString(fileId) @@ -67,20 +57,9 @@ func (c *ChunkCache) doGetChunk(fileId string) (data []byte) { glog.Errorf("failed to parse file id %s", fileId) return nil } - for _, diskCache := range c.diskCaches { - data, err = diskCache.GetNeedle(fid.Key) - if err == storage.ErrorNotFound { - continue - } - if err != nil { - glog.Errorf("failed to read cache file %s id %s", diskCache.fileName, fileId) - continue - } - if len(data) != 0 { - return - } - } - return nil + + return c.diskCache.getChunk(fid.Key) + } func (c *ChunkCache) SetChunk(fileId string, data []byte) { @@ -95,22 +74,8 @@ func (c *ChunkCache) SetChunk(fileId string, data []byte) { func (c *ChunkCache) doSetChunk(fileId string, data []byte) { - c.memCache.SetChunk(fileId, data) - - if len(c.diskCaches) == 0 { - return - } - - if c.diskCaches[0].fileSize+int64(len(data)) > c.diskCaches[0].sizeLimit { - t, resetErr := c.diskCaches[len(c.diskCaches)-1].Reset() - if resetErr != nil { - glog.Errorf("failed to reset cache file %s", c.diskCaches[len(c.diskCaches)-1].fileName) - return - } - for i := len(c.diskCaches) - 1; i > 0; i-- { - c.diskCaches[i] = c.diskCaches[i-1] - } - c.diskCaches[0] = t + if len(data) < memCacheSizeLimit { + c.memCache.SetChunk(fileId, data) } fid, err := needle.ParseFileIdFromString(fileId) @@ -118,7 +83,8 @@ func (c *ChunkCache) doSetChunk(fileId string, data []byte) { glog.Errorf("failed to parse file id %s", fileId) return } - c.diskCaches[0].WriteNeedle(fid.Key, data) + + c.diskCache.setChunk(fid.Key, data) } @@ -128,7 +94,5 @@ func (c *ChunkCache) Shutdown() { } c.Lock() defer c.Unlock() - for _, diskCache := range c.diskCaches { - diskCache.Shutdown() - } + c.diskCache.shutdown() } -- cgit v1.2.3 From 2b5c4fbbf37e25adfa19b081c4adf5458b05b66c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 13 Apr 2020 22:19:27 -0700 Subject: tiered caching 1/4 for small less than 1MB files. 1/4 for 1~4MB files, 1/2 for bigger than 4MB files --- weed/util/chunk_cache/chunk_cache.go | 43 ++++++++++++++++++++++++------------ 1 file changed, 29 insertions(+), 14 deletions(-) (limited to 'weed/util/chunk_cache/chunk_cache.go') diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go index 232e57a55..e1d4b639f 100644 --- a/weed/util/chunk_cache/chunk_cache.go +++ b/weed/util/chunk_cache/chunk_cache.go @@ -8,27 +8,27 @@ import ( ) const ( - memCacheSizeLimit = 1024 * 1024 + memCacheSizeLimit = 1024 * 1024 + onDiskCacheSizeLimit0 = memCacheSizeLimit + onDiskCacheSizeLimit1 = 4 * memCacheSizeLimit ) // a global cache for recently accessed file chunks type ChunkCache struct { - memCache *ChunkCacheInMemory - diskCache *OnDiskCacheLayer + memCache *ChunkCacheInMemory + diskCaches []*OnDiskCacheLayer sync.RWMutex } -func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64, segmentCount int) *ChunkCache { - - volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000) - if volumeCount < segmentCount { - volumeCount, volumeSize = segmentCount, diskSizeMB/int64(segmentCount) - } +func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64) *ChunkCache { c := &ChunkCache{ - memCache: NewChunkCacheInMemory(maxEntries), - diskCache: NewOnDiskCacheLayer(dir, "cache", volumeCount, volumeSize), + memCache: NewChunkCacheInMemory(maxEntries), } + c.diskCaches = make([]*OnDiskCacheLayer, 3) + c.diskCaches[0] = NewOnDiskCacheLayer(dir, "c0_1", diskSizeMB/4, 4) + c.diskCaches[1] = NewOnDiskCacheLayer(dir, "c1_4", diskSizeMB/4, 4) + c.diskCaches[2] = NewOnDiskCacheLayer(dir, "cache", diskSizeMB/2, 4) return c } @@ -58,7 +58,14 @@ func (c *ChunkCache) doGetChunk(fileId string, chunkSize uint64) (data []byte) { return nil } - return c.diskCache.getChunk(fid.Key) + for _, diskCache := range c.diskCaches { + data := diskCache.getChunk(fid.Key) + if len(data) != 0 { + return data + } + } + + return nil } @@ -84,7 +91,13 @@ func (c *ChunkCache) doSetChunk(fileId string, data []byte) { return } - c.diskCache.setChunk(fid.Key, data) + if len(data) < onDiskCacheSizeLimit0 { + c.diskCaches[0].setChunk(fid.Key, data) + } else if len(data) < onDiskCacheSizeLimit1 { + c.diskCaches[1].setChunk(fid.Key, data) + } else { + c.diskCaches[2].setChunk(fid.Key, data) + } } @@ -94,5 +107,7 @@ func (c *ChunkCache) Shutdown() { } c.Lock() defer c.Unlock() - c.diskCache.shutdown() + for _, diskCache := range c.diskCaches { + diskCache.shutdown() + } } -- cgit v1.2.3