aboutsummaryrefslogtreecommitdiff
path: root/weed/util/chunk_cache/chunk_cache.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util/chunk_cache/chunk_cache.go')
-rw-r--r--weed/util/chunk_cache/chunk_cache.go111
1 files changed, 95 insertions, 16 deletions
diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go
index e2676d9cc..682f5185a 100644
--- a/weed/util/chunk_cache/chunk_cache.go
+++ b/weed/util/chunk_cache/chunk_cache.go
@@ -1,36 +1,115 @@
package chunk_cache
import (
- "time"
+ "fmt"
+ "path"
+ "sort"
+ "sync"
- "github.com/karlseguin/ccache"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
// a global cache for recently accessed file chunks
type ChunkCache struct {
- cache *ccache.Cache
+ memCache *ChunkCacheInMemory
+ diskCaches []*ChunkCacheVolume
+ sync.RWMutex
}
-func NewChunkCache(maxEntries int64) *ChunkCache {
- pruneCount := maxEntries >> 3
- if pruneCount <= 0 {
- pruneCount = 500
+func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64, segmentCount int) *ChunkCache {
+ c := &ChunkCache{
+ memCache: NewChunkCacheInMemory(maxEntries),
}
- return &ChunkCache{
- cache: ccache.New(ccache.Configure().MaxSize(maxEntries).ItemsToPrune(uint32(pruneCount))),
+
+ volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000)
+ if volumeCount < segmentCount {
+ volumeCount, volumeSize = segmentCount, diskSizeMB/int64(segmentCount)
+ }
+
+ for i := 0; i < volumeCount; i++ {
+ fileName := path.Join(dir, fmt.Sprintf("cache_%d", i))
+ diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize*1024*1024)
+ if err != nil {
+ glog.Errorf("failed to add cache %s : %v", fileName, err)
+ } else {
+ c.diskCaches = append(c.diskCaches, diskCache)
+ }
}
+
+ // keep newest cache to the front
+ sort.Slice(c.diskCaches, func(i, j int) bool {
+ return c.diskCaches[i].lastModTime.After(c.diskCaches[j].lastModTime)
+ })
+
+ return c
}
-func (c *ChunkCache) GetChunk(fileId string) []byte {
- item := c.cache.Get(fileId)
- if item == nil {
+func (c *ChunkCache) GetChunk(fileId string) (data []byte) {
+ c.RLock()
+ defer c.RUnlock()
+
+ if data = c.memCache.GetChunk(fileId); data != nil {
+ return data
+ }
+
+ fid, err := needle.ParseFileIdFromString(fileId)
+ if err != nil {
+ glog.Errorf("failed to parse file id %s", fileId)
return nil
}
- data := item.Value().([]byte)
- item.Extend(time.Hour)
- return data
+ for _, diskCache := range c.diskCaches {
+ data, err = diskCache.GetNeedle(fid.Key)
+ if err == storage.ErrorNotFound {
+ continue
+ }
+ if err != nil {
+ glog.Errorf("failed to read cache file %s id %s", diskCache.fileName, fileId)
+ continue
+ }
+ if len(data) != 0 {
+ return
+ }
+ }
+ return nil
}
func (c *ChunkCache) SetChunk(fileId string, data []byte) {
- c.cache.Set(fileId, data, time.Hour)
+ c.Lock()
+ defer c.Unlock()
+
+ c.memCache.SetChunk(fileId, data)
+
+ if len(c.diskCaches) == 0 {
+ return
+ }
+
+ if c.diskCaches[0].fileSize+int64(len(data)) > c.diskCaches[0].sizeLimit {
+ t, resetErr := c.diskCaches[len(c.diskCaches)-1].Reset()
+ if resetErr != nil {
+ glog.Errorf("failed to reset cache file %s", c.diskCaches[len(c.diskCaches)-1].fileName)
+ return
+ }
+ for i := len(c.diskCaches) - 1; i > 0; i-- {
+ c.diskCaches[i] = c.diskCaches[i-1]
+ }
+ c.diskCaches[0] = t
+ }
+
+ fid, err := needle.ParseFileIdFromString(fileId)
+ if err != nil {
+ glog.Errorf("failed to parse file id %s", fileId)
+ return
+ }
+ c.diskCaches[0].WriteNeedle(fid.Key, data)
+
}
+
+func (c *ChunkCache) Shutdown() {
+ c.Lock()
+ defer c.Unlock()
+ for _, diskCache := range c.diskCaches {
+ diskCache.Shutdown()
+ }
+} \ No newline at end of file