diff options
Diffstat (limited to 'weed/util/chunk_cache')
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache.go | 111 | ||||
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache_in_memory.go | 36 | ||||
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache_on_disk.go | 145 | ||||
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache_on_disk_test.go | 58 |
4 files changed, 334 insertions, 16 deletions
diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go index e2676d9cc..682f5185a 100644 --- a/weed/util/chunk_cache/chunk_cache.go +++ b/weed/util/chunk_cache/chunk_cache.go @@ -1,36 +1,115 @@ package chunk_cache import ( - "time" + "fmt" + "path" + "sort" + "sync" - "github.com/karlseguin/ccache" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) // a global cache for recently accessed file chunks type ChunkCache struct { - cache *ccache.Cache + memCache *ChunkCacheInMemory + diskCaches []*ChunkCacheVolume + sync.RWMutex } -func NewChunkCache(maxEntries int64) *ChunkCache { - pruneCount := maxEntries >> 3 - if pruneCount <= 0 { - pruneCount = 500 +func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64, segmentCount int) *ChunkCache { + c := &ChunkCache{ + memCache: NewChunkCacheInMemory(maxEntries), } - return &ChunkCache{ - cache: ccache.New(ccache.Configure().MaxSize(maxEntries).ItemsToPrune(uint32(pruneCount))), + + volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000) + if volumeCount < segmentCount { + volumeCount, volumeSize = segmentCount, diskSizeMB/int64(segmentCount) + } + + for i := 0; i < volumeCount; i++ { + fileName := path.Join(dir, fmt.Sprintf("cache_%d", i)) + diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize*1024*1024) + if err != nil { + glog.Errorf("failed to add cache %s : %v", fileName, err) + } else { + c.diskCaches = append(c.diskCaches, diskCache) + } } + + // keep newest cache to the front + sort.Slice(c.diskCaches, func(i, j int) bool { + return c.diskCaches[i].lastModTime.After(c.diskCaches[j].lastModTime) + }) + + return c } -func (c *ChunkCache) GetChunk(fileId string) []byte { - item := c.cache.Get(fileId) - if item == nil { +func (c *ChunkCache) GetChunk(fileId string) (data []byte) { + c.RLock() + defer c.RUnlock() + + if data = c.memCache.GetChunk(fileId); data != nil { + return data + } + + fid, err := needle.ParseFileIdFromString(fileId) + if err != nil { + glog.Errorf("failed to parse file id %s", fileId) return nil } - data := item.Value().([]byte) - item.Extend(time.Hour) - return data + for _, diskCache := range c.diskCaches { + data, err = diskCache.GetNeedle(fid.Key) + if err == storage.ErrorNotFound { + continue + } + if err != nil { + glog.Errorf("failed to read cache file %s id %s", diskCache.fileName, fileId) + continue + } + if len(data) != 0 { + return + } + } + return nil } func (c *ChunkCache) SetChunk(fileId string, data []byte) { - c.cache.Set(fileId, data, time.Hour) + c.Lock() + defer c.Unlock() + + c.memCache.SetChunk(fileId, data) + + if len(c.diskCaches) == 0 { + return + } + + if c.diskCaches[0].fileSize+int64(len(data)) > c.diskCaches[0].sizeLimit { + t, resetErr := c.diskCaches[len(c.diskCaches)-1].Reset() + if resetErr != nil { + glog.Errorf("failed to reset cache file %s", c.diskCaches[len(c.diskCaches)-1].fileName) + return + } + for i := len(c.diskCaches) - 1; i > 0; i-- { + c.diskCaches[i] = c.diskCaches[i-1] + } + c.diskCaches[0] = t + } + + fid, err := needle.ParseFileIdFromString(fileId) + if err != nil { + glog.Errorf("failed to parse file id %s", fileId) + return + } + c.diskCaches[0].WriteNeedle(fid.Key, data) + } + +func (c *ChunkCache) Shutdown() { + c.Lock() + defer c.Unlock() + for _, diskCache := range c.diskCaches { + diskCache.Shutdown() + } +}
\ No newline at end of file diff --git a/weed/util/chunk_cache/chunk_cache_in_memory.go b/weed/util/chunk_cache/chunk_cache_in_memory.go new file mode 100644 index 000000000..931e45e9a --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache_in_memory.go @@ -0,0 +1,36 @@ +package chunk_cache + +import ( + "time" + + "github.com/karlseguin/ccache" +) + +// a global cache for recently accessed file chunks +type ChunkCacheInMemory struct { + cache *ccache.Cache +} + +func NewChunkCacheInMemory(maxEntries int64) *ChunkCacheInMemory { + pruneCount := maxEntries >> 3 + if pruneCount <= 0 { + pruneCount = 500 + } + return &ChunkCacheInMemory{ + cache: ccache.New(ccache.Configure().MaxSize(maxEntries).ItemsToPrune(uint32(pruneCount))), + } +} + +func (c *ChunkCacheInMemory) GetChunk(fileId string) []byte { + item := c.cache.Get(fileId) + if item == nil { + return nil + } + data := item.Value().([]byte) + item.Extend(time.Hour) + return data +} + +func (c *ChunkCacheInMemory) SetChunk(fileId string, data []byte) { + c.cache.Set(fileId, data, time.Hour) +} diff --git a/weed/util/chunk_cache/chunk_cache_on_disk.go b/weed/util/chunk_cache/chunk_cache_on_disk.go new file mode 100644 index 000000000..2c7ef8d39 --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache_on_disk.go @@ -0,0 +1,145 @@ +package chunk_cache + +import ( + "fmt" + "os" + "time" + + "github.com/syndtr/goleveldb/leveldb/opt" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" +) + +// This implements an on disk cache +// The entries are an FIFO with a size limit + +type ChunkCacheVolume struct { + DataBackend backend.BackendStorageFile + nm storage.NeedleMapper + fileName string + smallBuffer []byte + sizeLimit int64 + lastModTime time.Time + fileSize int64 +} + +func LoadOrCreateChunkCacheVolume(fileName string, preallocate int64) (*ChunkCacheVolume, error) { + + v := &ChunkCacheVolume{ + smallBuffer: make([]byte, types.NeedlePaddingSize), + fileName: fileName, + sizeLimit: preallocate, + } + + var err error + + if exists, canRead, canWrite, modTime, fileSize := util.CheckFile(v.fileName + ".dat"); exists { + if !canRead { + return nil, fmt.Errorf("cannot read cache file %s.dat", v.fileName) + } + if !canWrite { + return nil, fmt.Errorf("cannot write cache file %s.dat", v.fileName) + } + if dataFile, err := os.OpenFile(v.fileName+".dat", os.O_RDWR|os.O_CREATE, 0644); err != nil { + return nil, fmt.Errorf("cannot create cache file %s.dat: %v", v.fileName, err) + } else { + v.DataBackend = backend.NewDiskFile(dataFile) + v.lastModTime = modTime + v.fileSize = fileSize + } + } else { + if v.DataBackend, err = backend.CreateVolumeFile(v.fileName+".dat", preallocate, 0); err != nil { + return nil, fmt.Errorf("cannot create cache file %s.dat: %v", v.fileName, err) + } + v.lastModTime = time.Now() + } + + var indexFile *os.File + if indexFile, err = os.OpenFile(v.fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); err != nil { + return nil, fmt.Errorf("cannot write cache index %s.idx: %v", v.fileName, err) + } + + glog.V(0).Infoln("loading leveldb", v.fileName+".ldb") + opts := &opt.Options{ + BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB + CompactionTableSizeMultiplier: 10, // default value is 1 + } + if v.nm, err = storage.NewLevelDbNeedleMap(v.fileName+".ldb", indexFile, opts); err != nil { + return nil, fmt.Errorf("loading leveldb %s error: %v", v.fileName+".ldb", err) + } + + return v, nil + +} + +func (v *ChunkCacheVolume) Shutdown() { + if v.DataBackend != nil { + v.DataBackend.Close() + v.DataBackend = nil + } + if v.nm != nil { + v.nm.Close() + v.nm = nil + } +} + +func (v *ChunkCacheVolume) destroy() { + v.Shutdown() + os.Remove(v.fileName + ".dat") + os.Remove(v.fileName + ".idx") + os.RemoveAll(v.fileName + ".ldb") +} + +func (v *ChunkCacheVolume) Reset() (*ChunkCacheVolume, error) { + v.destroy() + return LoadOrCreateChunkCacheVolume(v.fileName, v.sizeLimit) +} + +func (v *ChunkCacheVolume) GetNeedle(key types.NeedleId) ([]byte, error) { + + nv, ok := v.nm.Get(key) + if !ok { + return nil, storage.ErrorNotFound + } + data := make([]byte, nv.Size) + if readSize, readErr := v.DataBackend.ReadAt(data, nv.Offset.ToAcutalOffset()); readErr != nil { + return nil, fmt.Errorf("read %s.dat [%d,%d): %v", + v.fileName, nv.Offset.ToAcutalOffset(), nv.Offset.ToAcutalOffset()+int64(nv.Size), readErr) + } else { + if readSize != int(nv.Size) { + return nil, fmt.Errorf("read %d, expected %d", readSize, nv.Size) + } + } + + return data, nil +} + +func (v *ChunkCacheVolume) WriteNeedle(key types.NeedleId, data []byte) error { + + offset := v.fileSize + + written, err := v.DataBackend.WriteAt(data, offset) + if err != nil { + return err + } else if written != len(data) { + return fmt.Errorf("partial written %d, expected %d", written, len(data)) + } + + v.fileSize += int64(written) + extraSize := written % types.NeedlePaddingSize + if extraSize != 0 { + v.DataBackend.WriteAt(v.smallBuffer[:types.NeedlePaddingSize-extraSize], offset+int64(written)) + v.fileSize += int64(types.NeedlePaddingSize - extraSize) + } + + if err := v.nm.Put(key, types.ToOffset(offset), uint32(len(data))); err != nil { + glog.V(4).Infof("failed to save in needle map %d: %v", key, err) + } + + return nil +} diff --git a/weed/util/chunk_cache/chunk_cache_on_disk_test.go b/weed/util/chunk_cache/chunk_cache_on_disk_test.go new file mode 100644 index 000000000..256b10139 --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache_on_disk_test.go @@ -0,0 +1,58 @@ +package chunk_cache + +import ( + "bytes" + "fmt" + "io/ioutil" + "math/rand" + "os" + "testing" +) + +func TestOnDisk(t *testing.T) { + + tmpDir, _ := ioutil.TempDir("", "c") + defer os.RemoveAll(tmpDir) + + totalDiskSizeMb := int64(6) + segmentCount := 2 + + cache := NewChunkCache(0, tmpDir, totalDiskSizeMb, segmentCount) + + writeCount := 5 + type test_data struct { + data []byte + fileId string + } + testData := make([]*test_data, writeCount) + for i:=0;i<writeCount;i++{ + buff := make([]byte, 1024*1024) + rand.Read(buff) + testData[i] = &test_data{ + data: buff, + fileId: fmt.Sprintf("1,%daabbccdd", i+1), + } + cache.SetChunk(testData[i].fileId, testData[i].data) + } + + for i:=0;i<writeCount;i++{ + data := cache.GetChunk(testData[i].fileId) + if bytes.Compare(data, testData[i].data) != 0 { + t.Errorf("failed to write to and read from cache: %d", i) + } + } + + cache.Shutdown() + + cache = NewChunkCache(0, tmpDir, totalDiskSizeMb, segmentCount) + + for i:=0;i<writeCount;i++{ + data := cache.GetChunk(testData[i].fileId) + if bytes.Compare(data, testData[i].data) != 0 { + t.Errorf("failed to write to and read from cache: %d", i) + } + } + + cache.Shutdown() + +} |
