diff options
| author | shibinbin <shibinbin@megvii.com> | 2020-06-04 17:24:18 +0800 |
|---|---|---|
| committer | shibinbin <shibinbin@megvii.com> | 2020-06-04 17:24:18 +0800 |
| commit | 40334bc28d3fa694ce59b4e65077efb845264d20 (patch) | |
| tree | a085e2e33851c4d916bef2952abc7cfbfe95ee88 /weed/util/chunk_cache | |
| parent | d892cad15d748327c2b7c649f6398ff35d8dce0b (diff) | |
| parent | fbed2e9026b71c810dd86bd826c9e068e93d3c48 (diff) | |
| download | seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.tar.xz seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.zip | |
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'weed/util/chunk_cache')
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache.go | 113 | ||||
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache_in_memory.go | 36 | ||||
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache_on_disk.go | 145 | ||||
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache_on_disk_test.go | 59 | ||||
| -rw-r--r-- | weed/util/chunk_cache/on_disk_cache_layer.go | 89 |
5 files changed, 442 insertions, 0 deletions
diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go new file mode 100644 index 000000000..e1d4b639f --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache.go @@ -0,0 +1,113 @@ +package chunk_cache + +import ( + "sync" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +const ( + memCacheSizeLimit = 1024 * 1024 + onDiskCacheSizeLimit0 = memCacheSizeLimit + onDiskCacheSizeLimit1 = 4 * memCacheSizeLimit +) + +// a global cache for recently accessed file chunks +type ChunkCache struct { + memCache *ChunkCacheInMemory + diskCaches []*OnDiskCacheLayer + sync.RWMutex +} + +func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64) *ChunkCache { + + c := &ChunkCache{ + memCache: NewChunkCacheInMemory(maxEntries), + } + c.diskCaches = make([]*OnDiskCacheLayer, 3) + c.diskCaches[0] = NewOnDiskCacheLayer(dir, "c0_1", diskSizeMB/4, 4) + c.diskCaches[1] = NewOnDiskCacheLayer(dir, "c1_4", diskSizeMB/4, 4) + c.diskCaches[2] = NewOnDiskCacheLayer(dir, "cache", diskSizeMB/2, 4) + + return c +} + +func (c *ChunkCache) GetChunk(fileId string, chunkSize uint64) (data []byte) { + if c == nil { + return + } + + c.RLock() + defer c.RUnlock() + + return c.doGetChunk(fileId, chunkSize) +} + +func (c *ChunkCache) doGetChunk(fileId string, chunkSize uint64) (data []byte) { + + if chunkSize < memCacheSizeLimit { + if data = c.memCache.GetChunk(fileId); data != nil { + return data + } + } + + fid, err := needle.ParseFileIdFromString(fileId) + if err != nil { + glog.Errorf("failed to parse file id %s", fileId) + return nil + } + + for _, diskCache := range c.diskCaches { + data := diskCache.getChunk(fid.Key) + if len(data) != 0 { + return data + } + } + + return nil + +} + +func (c *ChunkCache) SetChunk(fileId string, data []byte) { + if c == nil { + return + } + c.Lock() + defer c.Unlock() + + c.doSetChunk(fileId, data) +} + +func (c *ChunkCache) doSetChunk(fileId string, data []byte) { + + if len(data) < memCacheSizeLimit { + c.memCache.SetChunk(fileId, data) + } + + fid, err := needle.ParseFileIdFromString(fileId) + if err != nil { + glog.Errorf("failed to parse file id %s", fileId) + return + } + + if len(data) < onDiskCacheSizeLimit0 { + c.diskCaches[0].setChunk(fid.Key, data) + } else if len(data) < onDiskCacheSizeLimit1 { + c.diskCaches[1].setChunk(fid.Key, data) + } else { + c.diskCaches[2].setChunk(fid.Key, data) + } + +} + +func (c *ChunkCache) Shutdown() { + if c == nil { + return + } + c.Lock() + defer c.Unlock() + for _, diskCache := range c.diskCaches { + diskCache.shutdown() + } +} diff --git a/weed/util/chunk_cache/chunk_cache_in_memory.go b/weed/util/chunk_cache/chunk_cache_in_memory.go new file mode 100644 index 000000000..931e45e9a --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache_in_memory.go @@ -0,0 +1,36 @@ +package chunk_cache + +import ( + "time" + + "github.com/karlseguin/ccache" +) + +// a global cache for recently accessed file chunks +type ChunkCacheInMemory struct { + cache *ccache.Cache +} + +func NewChunkCacheInMemory(maxEntries int64) *ChunkCacheInMemory { + pruneCount := maxEntries >> 3 + if pruneCount <= 0 { + pruneCount = 500 + } + return &ChunkCacheInMemory{ + cache: ccache.New(ccache.Configure().MaxSize(maxEntries).ItemsToPrune(uint32(pruneCount))), + } +} + +func (c *ChunkCacheInMemory) GetChunk(fileId string) []byte { + item := c.cache.Get(fileId) + if item == nil { + return nil + } + data := item.Value().([]byte) + item.Extend(time.Hour) + return data +} + +func (c *ChunkCacheInMemory) SetChunk(fileId string, data []byte) { + c.cache.Set(fileId, data, time.Hour) +} diff --git a/weed/util/chunk_cache/chunk_cache_on_disk.go b/weed/util/chunk_cache/chunk_cache_on_disk.go new file mode 100644 index 000000000..2c7ef8d39 --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache_on_disk.go @@ -0,0 +1,145 @@ +package chunk_cache + +import ( + "fmt" + "os" + "time" + + "github.com/syndtr/goleveldb/leveldb/opt" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" +) + +// This implements an on disk cache +// The entries are an FIFO with a size limit + +type ChunkCacheVolume struct { + DataBackend backend.BackendStorageFile + nm storage.NeedleMapper + fileName string + smallBuffer []byte + sizeLimit int64 + lastModTime time.Time + fileSize int64 +} + +func LoadOrCreateChunkCacheVolume(fileName string, preallocate int64) (*ChunkCacheVolume, error) { + + v := &ChunkCacheVolume{ + smallBuffer: make([]byte, types.NeedlePaddingSize), + fileName: fileName, + sizeLimit: preallocate, + } + + var err error + + if exists, canRead, canWrite, modTime, fileSize := util.CheckFile(v.fileName + ".dat"); exists { + if !canRead { + return nil, fmt.Errorf("cannot read cache file %s.dat", v.fileName) + } + if !canWrite { + return nil, fmt.Errorf("cannot write cache file %s.dat", v.fileName) + } + if dataFile, err := os.OpenFile(v.fileName+".dat", os.O_RDWR|os.O_CREATE, 0644); err != nil { + return nil, fmt.Errorf("cannot create cache file %s.dat: %v", v.fileName, err) + } else { + v.DataBackend = backend.NewDiskFile(dataFile) + v.lastModTime = modTime + v.fileSize = fileSize + } + } else { + if v.DataBackend, err = backend.CreateVolumeFile(v.fileName+".dat", preallocate, 0); err != nil { + return nil, fmt.Errorf("cannot create cache file %s.dat: %v", v.fileName, err) + } + v.lastModTime = time.Now() + } + + var indexFile *os.File + if indexFile, err = os.OpenFile(v.fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); err != nil { + return nil, fmt.Errorf("cannot write cache index %s.idx: %v", v.fileName, err) + } + + glog.V(0).Infoln("loading leveldb", v.fileName+".ldb") + opts := &opt.Options{ + BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB + CompactionTableSizeMultiplier: 10, // default value is 1 + } + if v.nm, err = storage.NewLevelDbNeedleMap(v.fileName+".ldb", indexFile, opts); err != nil { + return nil, fmt.Errorf("loading leveldb %s error: %v", v.fileName+".ldb", err) + } + + return v, nil + +} + +func (v *ChunkCacheVolume) Shutdown() { + if v.DataBackend != nil { + v.DataBackend.Close() + v.DataBackend = nil + } + if v.nm != nil { + v.nm.Close() + v.nm = nil + } +} + +func (v *ChunkCacheVolume) destroy() { + v.Shutdown() + os.Remove(v.fileName + ".dat") + os.Remove(v.fileName + ".idx") + os.RemoveAll(v.fileName + ".ldb") +} + +func (v *ChunkCacheVolume) Reset() (*ChunkCacheVolume, error) { + v.destroy() + return LoadOrCreateChunkCacheVolume(v.fileName, v.sizeLimit) +} + +func (v *ChunkCacheVolume) GetNeedle(key types.NeedleId) ([]byte, error) { + + nv, ok := v.nm.Get(key) + if !ok { + return nil, storage.ErrorNotFound + } + data := make([]byte, nv.Size) + if readSize, readErr := v.DataBackend.ReadAt(data, nv.Offset.ToAcutalOffset()); readErr != nil { + return nil, fmt.Errorf("read %s.dat [%d,%d): %v", + v.fileName, nv.Offset.ToAcutalOffset(), nv.Offset.ToAcutalOffset()+int64(nv.Size), readErr) + } else { + if readSize != int(nv.Size) { + return nil, fmt.Errorf("read %d, expected %d", readSize, nv.Size) + } + } + + return data, nil +} + +func (v *ChunkCacheVolume) WriteNeedle(key types.NeedleId, data []byte) error { + + offset := v.fileSize + + written, err := v.DataBackend.WriteAt(data, offset) + if err != nil { + return err + } else if written != len(data) { + return fmt.Errorf("partial written %d, expected %d", written, len(data)) + } + + v.fileSize += int64(written) + extraSize := written % types.NeedlePaddingSize + if extraSize != 0 { + v.DataBackend.WriteAt(v.smallBuffer[:types.NeedlePaddingSize-extraSize], offset+int64(written)) + v.fileSize += int64(types.NeedlePaddingSize - extraSize) + } + + if err := v.nm.Put(key, types.ToOffset(offset), uint32(len(data))); err != nil { + glog.V(4).Infof("failed to save in needle map %d: %v", key, err) + } + + return nil +} diff --git a/weed/util/chunk_cache/chunk_cache_on_disk_test.go b/weed/util/chunk_cache/chunk_cache_on_disk_test.go new file mode 100644 index 000000000..f061f2ba2 --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache_on_disk_test.go @@ -0,0 +1,59 @@ +package chunk_cache + +import ( + "bytes" + "fmt" + "io/ioutil" + "math/rand" + "os" + "testing" +) + +func TestOnDisk(t *testing.T) { + + tmpDir, _ := ioutil.TempDir("", "c") + defer os.RemoveAll(tmpDir) + + totalDiskSizeMb := int64(32) + + cache := NewChunkCache(0, tmpDir, totalDiskSizeMb) + + writeCount := 5 + type test_data struct { + data []byte + fileId string + size uint64 + } + testData := make([]*test_data, writeCount) + for i := 0; i < writeCount; i++ { + buff := make([]byte, 1024*1024) + rand.Read(buff) + testData[i] = &test_data{ + data: buff, + fileId: fmt.Sprintf("1,%daabbccdd", i+1), + size: uint64(len(buff)), + } + cache.SetChunk(testData[i].fileId, testData[i].data) + } + + for i := 0; i < writeCount; i++ { + data := cache.GetChunk(testData[i].fileId, testData[i].size) + if bytes.Compare(data, testData[i].data) != 0 { + t.Errorf("failed to write to and read from cache: %d", i) + } + } + + cache.Shutdown() + + cache = NewChunkCache(0, tmpDir, totalDiskSizeMb) + + for i := 0; i < writeCount; i++ { + data := cache.GetChunk(testData[i].fileId, testData[i].size) + if bytes.Compare(data, testData[i].data) != 0 { + t.Errorf("failed to write to and read from cache: %d", i) + } + } + + cache.Shutdown() + +} diff --git a/weed/util/chunk_cache/on_disk_cache_layer.go b/weed/util/chunk_cache/on_disk_cache_layer.go new file mode 100644 index 000000000..9cf8e3ab2 --- /dev/null +++ b/weed/util/chunk_cache/on_disk_cache_layer.go @@ -0,0 +1,89 @@ +package chunk_cache + +import ( + "fmt" + "path" + "sort" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +type OnDiskCacheLayer struct { + diskCaches []*ChunkCacheVolume +} + +func NewOnDiskCacheLayer(dir, namePrefix string, diskSizeMB int64, segmentCount int) *OnDiskCacheLayer { + + volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000) + if volumeCount < segmentCount { + volumeCount, volumeSize = segmentCount, diskSizeMB/int64(segmentCount) + } + + c := &OnDiskCacheLayer{} + for i := 0; i < volumeCount; i++ { + fileName := path.Join(dir, fmt.Sprintf("%s_%d", namePrefix, i)) + diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize*1024*1024) + if err != nil { + glog.Errorf("failed to add cache %s : %v", fileName, err) + } else { + c.diskCaches = append(c.diskCaches, diskCache) + } + } + + // keep newest cache to the front + sort.Slice(c.diskCaches, func(i, j int) bool { + return c.diskCaches[i].lastModTime.After(c.diskCaches[j].lastModTime) + }) + + return c +} + +func (c *OnDiskCacheLayer) setChunk(needleId types.NeedleId, data []byte) { + + if c.diskCaches[0].fileSize+int64(len(data)) > c.diskCaches[0].sizeLimit { + t, resetErr := c.diskCaches[len(c.diskCaches)-1].Reset() + if resetErr != nil { + glog.Errorf("failed to reset cache file %s", c.diskCaches[len(c.diskCaches)-1].fileName) + return + } + for i := len(c.diskCaches) - 1; i > 0; i-- { + c.diskCaches[i] = c.diskCaches[i-1] + } + c.diskCaches[0] = t + } + + c.diskCaches[0].WriteNeedle(needleId, data) + +} + +func (c *OnDiskCacheLayer) getChunk(needleId types.NeedleId) (data []byte) { + + var err error + + for _, diskCache := range c.diskCaches { + data, err = diskCache.GetNeedle(needleId) + if err == storage.ErrorNotFound { + continue + } + if err != nil { + glog.Errorf("failed to read cache file %s id %d", diskCache.fileName, needleId) + continue + } + if len(data) != 0 { + return + } + } + + return nil + +} + +func (c *OnDiskCacheLayer) shutdown() { + + for _, diskCache := range c.diskCaches { + diskCache.Shutdown() + } + +} |
