diff options
Diffstat (limited to 'weed/util/chunk_cache/chunk_cache.go')
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache.go | 111 |
1 files changed, 95 insertions, 16 deletions
diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go index e2676d9cc..682f5185a 100644 --- a/weed/util/chunk_cache/chunk_cache.go +++ b/weed/util/chunk_cache/chunk_cache.go @@ -1,36 +1,115 @@ package chunk_cache import ( - "time" + "fmt" + "path" + "sort" + "sync" - "github.com/karlseguin/ccache" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) // a global cache for recently accessed file chunks type ChunkCache struct { - cache *ccache.Cache + memCache *ChunkCacheInMemory + diskCaches []*ChunkCacheVolume + sync.RWMutex } -func NewChunkCache(maxEntries int64) *ChunkCache { - pruneCount := maxEntries >> 3 - if pruneCount <= 0 { - pruneCount = 500 +func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64, segmentCount int) *ChunkCache { + c := &ChunkCache{ + memCache: NewChunkCacheInMemory(maxEntries), } - return &ChunkCache{ - cache: ccache.New(ccache.Configure().MaxSize(maxEntries).ItemsToPrune(uint32(pruneCount))), + + volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000) + if volumeCount < segmentCount { + volumeCount, volumeSize = segmentCount, diskSizeMB/int64(segmentCount) + } + + for i := 0; i < volumeCount; i++ { + fileName := path.Join(dir, fmt.Sprintf("cache_%d", i)) + diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize*1024*1024) + if err != nil { + glog.Errorf("failed to add cache %s : %v", fileName, err) + } else { + c.diskCaches = append(c.diskCaches, diskCache) + } } + + // keep newest cache to the front + sort.Slice(c.diskCaches, func(i, j int) bool { + return c.diskCaches[i].lastModTime.After(c.diskCaches[j].lastModTime) + }) + + return c } -func (c *ChunkCache) GetChunk(fileId string) []byte { - item := c.cache.Get(fileId) - if item == nil { +func (c *ChunkCache) GetChunk(fileId string) (data []byte) { + c.RLock() + defer c.RUnlock() + + if data = c.memCache.GetChunk(fileId); data != nil { + return data + } + + fid, err := needle.ParseFileIdFromString(fileId) + if err != nil { + glog.Errorf("failed to parse file id %s", fileId) return nil } - data := item.Value().([]byte) - item.Extend(time.Hour) - return data + for _, diskCache := range c.diskCaches { + data, err = diskCache.GetNeedle(fid.Key) + if err == storage.ErrorNotFound { + continue + } + if err != nil { + glog.Errorf("failed to read cache file %s id %s", diskCache.fileName, fileId) + continue + } + if len(data) != 0 { + return + } + } + return nil } func (c *ChunkCache) SetChunk(fileId string, data []byte) { - c.cache.Set(fileId, data, time.Hour) + c.Lock() + defer c.Unlock() + + c.memCache.SetChunk(fileId, data) + + if len(c.diskCaches) == 0 { + return + } + + if c.diskCaches[0].fileSize+int64(len(data)) > c.diskCaches[0].sizeLimit { + t, resetErr := c.diskCaches[len(c.diskCaches)-1].Reset() + if resetErr != nil { + glog.Errorf("failed to reset cache file %s", c.diskCaches[len(c.diskCaches)-1].fileName) + return + } + for i := len(c.diskCaches) - 1; i > 0; i-- { + c.diskCaches[i] = c.diskCaches[i-1] + } + c.diskCaches[0] = t + } + + fid, err := needle.ParseFileIdFromString(fileId) + if err != nil { + glog.Errorf("failed to parse file id %s", fileId) + return + } + c.diskCaches[0].WriteNeedle(fid.Key, data) + } + +func (c *ChunkCache) Shutdown() { + c.Lock() + defer c.Unlock() + for _, diskCache := range c.diskCaches { + diskCache.Shutdown() + } +}
\ No newline at end of file |
