aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/command/mount.go10
-rw-r--r--weed/command/mount_std.go6
-rw-r--r--weed/command/webdav.go7
-rw-r--r--weed/filesys/wfs.go31
-rw-r--r--weed/server/webdav_server.go9
-rw-r--r--weed/util/chunk_cache/chunk_cache.go111
-rw-r--r--weed/util/chunk_cache/chunk_cache_in_memory.go36
-rw-r--r--weed/util/chunk_cache/chunk_cache_on_disk.go145
-rw-r--r--weed/util/chunk_cache/chunk_cache_on_disk_test.go58
9 files changed, 378 insertions, 35 deletions
diff --git a/weed/command/mount.go b/weed/command/mount.go
index adf384a6f..6165402b4 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -1,5 +1,9 @@
package command
+import (
+ "os"
+)
+
type MountOptions struct {
filer *string
filerMountRootPath *string
@@ -9,7 +13,8 @@ type MountOptions struct {
replication *string
ttlSec *int
chunkSizeLimitMB *int
- chunkCacheCountLimit *int64
+ cacheDir *string
+ cacheSizeMB *int64
dataCenter *string
allowOthers *bool
umaskString *string
@@ -33,7 +38,8 @@ func init() {
mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")
mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 4, "local write buffer size, also chunk large files")
- mountOptions.chunkCacheCountLimit = cmdMount.Flag.Int64("chunkCacheCountLimit", 1000, "number of file chunks to cache in memory")
+ mountOptions.cacheDir = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks")
+ mountOptions.cacheSizeMB = cmdMount.Flag.Int64("cacheCapacityMB", 1000, "local cache capacity in MB")
mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center")
mountOptions.allowOthers = cmdMount.Flag.Bool("allowOthers", true, "allows other users to access the file system")
mountOptions.umaskString = cmdMount.Flag.String("umask", "022", "octal umask, e.g., 022, 0111")
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 148540dec..0f87d6aee 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -129,7 +129,6 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
}
options = append(options, osSpecificMountOptions()...)
-
if *option.allowOthers {
options = append(options, fuse.AllowOther())
}
@@ -137,12 +136,12 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
options = append(options, fuse.AllowNonEmptyMount())
}
+ // mount
c, err := fuse.Mount(dir, options...)
if err != nil {
glog.V(0).Infof("mount: %v", err)
return true
}
-
defer fuse.Unmount(dir)
util.OnInterrupt(func() {
@@ -164,7 +163,8 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
Replication: *option.replication,
TtlSec: int32(*option.ttlSec),
ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
- ChunkCacheCountLimit: *option.chunkCacheCountLimit,
+ CacheDir: *option.cacheDir,
+ CacheSizeMB: *option.cacheSizeMB,
DataCenter: *option.dataCenter,
DirListCacheLimit: *option.dirListCacheLimit,
EntryCacheTtl: 3 * time.Second,
diff --git a/weed/command/webdav.go b/weed/command/webdav.go
index 4f5d5f5ce..a1616d0fc 100644
--- a/weed/command/webdav.go
+++ b/weed/command/webdav.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
+ "os"
"os/user"
"strconv"
"time"
@@ -26,6 +27,8 @@ type WebDavOption struct {
collection *string
tlsPrivateKey *string
tlsCertificate *string
+ cacheDir *string
+ cacheSizeMB *int64
}
func init() {
@@ -35,6 +38,8 @@ func init() {
webDavStandaloneOptions.collection = cmdWebDav.Flag.String("collection", "", "collection to create the files")
webDavStandaloneOptions.tlsPrivateKey = cmdWebDav.Flag.String("key.file", "", "path to the TLS private key file")
webDavStandaloneOptions.tlsCertificate = cmdWebDav.Flag.String("cert.file", "", "path to the TLS certificate file")
+ webDavStandaloneOptions.cacheDir = cmdWebDav.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks")
+ webDavStandaloneOptions.cacheSizeMB = cmdWebDav.Flag.Int64("cacheCapacityMB", 1000, "local cache capacity in MB")
}
var cmdWebDav = &Command{
@@ -105,6 +110,8 @@ func (wo *WebDavOption) startWebDav() bool {
Uid: uid,
Gid: gid,
Cipher: cipher,
+ CacheDir: *wo.cacheDir,
+ CacheSizeMB: *wo.cacheSizeMB,
})
if webdavServer_err != nil {
glog.Fatalf("WebDav Server startup error: %v", webdavServer_err)
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index 49db18b6e..b2f68c030 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -22,18 +22,19 @@ import (
)
type Option struct {
- FilerGrpcAddress string
- GrpcDialOption grpc.DialOption
- FilerMountRootPath string
- Collection string
- Replication string
- TtlSec int32
- ChunkSizeLimit int64
- ChunkCacheCountLimit int64
- DataCenter string
- DirListCacheLimit int64
- EntryCacheTtl time.Duration
- Umask os.FileMode
+ FilerGrpcAddress string
+ GrpcDialOption grpc.DialOption
+ FilerMountRootPath string
+ Collection string
+ Replication string
+ TtlSec int32
+ ChunkSizeLimit int64
+ CacheDir string
+ CacheSizeMB int64
+ DataCenter string
+ DirListCacheLimit int64
+ EntryCacheTtl time.Duration
+ Umask os.FileMode
MountUid uint32
MountGid uint32
@@ -72,6 +73,10 @@ type statsCache struct {
}
func NewSeaweedFileSystem(option *Option) *WFS {
+ chunkCache := chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB, 4)
+ util.OnInterrupt(func() {
+ chunkCache.Shutdown()
+ })
wfs := &WFS{
option: option,
listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)),
@@ -81,7 +86,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
return make([]byte, option.ChunkSizeLimit)
},
},
- chunkCache: chunk_cache.NewChunkCache(option.ChunkCacheCountLimit),
+ chunkCache: chunkCache,
}
wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index 082755291..affc953bc 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -34,6 +34,8 @@ type WebDavOption struct {
Uid uint32
Gid uint32
Cipher bool
+ CacheDir string
+ CacheSizeMB int64
}
type WebDavServer struct {
@@ -96,9 +98,14 @@ type WebDavFile struct {
}
func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
+
+ chunkCache := chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB, 4)
+ util.OnInterrupt(func() {
+ chunkCache.Shutdown()
+ })
return &WebDavFileSystem{
option: option,
- chunkCache: chunk_cache.NewChunkCache(1000),
+ chunkCache: chunkCache,
}, nil
}
diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go
index e2676d9cc..682f5185a 100644
--- a/weed/util/chunk_cache/chunk_cache.go
+++ b/weed/util/chunk_cache/chunk_cache.go
@@ -1,36 +1,115 @@
package chunk_cache
import (
- "time"
+ "fmt"
+ "path"
+ "sort"
+ "sync"
- "github.com/karlseguin/ccache"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
// a global cache for recently accessed file chunks
type ChunkCache struct {
- cache *ccache.Cache
+ memCache *ChunkCacheInMemory
+ diskCaches []*ChunkCacheVolume
+ sync.RWMutex
}
-func NewChunkCache(maxEntries int64) *ChunkCache {
- pruneCount := maxEntries >> 3
- if pruneCount <= 0 {
- pruneCount = 500
+func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64, segmentCount int) *ChunkCache {
+ c := &ChunkCache{
+ memCache: NewChunkCacheInMemory(maxEntries),
}
- return &ChunkCache{
- cache: ccache.New(ccache.Configure().MaxSize(maxEntries).ItemsToPrune(uint32(pruneCount))),
+
+ volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000)
+ if volumeCount < segmentCount {
+ volumeCount, volumeSize = segmentCount, diskSizeMB/int64(segmentCount)
+ }
+
+ for i := 0; i < volumeCount; i++ {
+ fileName := path.Join(dir, fmt.Sprintf("cache_%d", i))
+ diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize*1024*1024)
+ if err != nil {
+ glog.Errorf("failed to add cache %s : %v", fileName, err)
+ } else {
+ c.diskCaches = append(c.diskCaches, diskCache)
+ }
}
+
+ // keep newest cache to the front
+ sort.Slice(c.diskCaches, func(i, j int) bool {
+ return c.diskCaches[i].lastModTime.After(c.diskCaches[j].lastModTime)
+ })
+
+ return c
}
-func (c *ChunkCache) GetChunk(fileId string) []byte {
- item := c.cache.Get(fileId)
- if item == nil {
+func (c *ChunkCache) GetChunk(fileId string) (data []byte) {
+ c.RLock()
+ defer c.RUnlock()
+
+ if data = c.memCache.GetChunk(fileId); data != nil {
+ return data
+ }
+
+ fid, err := needle.ParseFileIdFromString(fileId)
+ if err != nil {
+ glog.Errorf("failed to parse file id %s", fileId)
return nil
}
- data := item.Value().([]byte)
- item.Extend(time.Hour)
- return data
+ for _, diskCache := range c.diskCaches {
+ data, err = diskCache.GetNeedle(fid.Key)
+ if err == storage.ErrorNotFound {
+ continue
+ }
+ if err != nil {
+ glog.Errorf("failed to read cache file %s id %s", diskCache.fileName, fileId)
+ continue
+ }
+ if len(data) != 0 {
+ return
+ }
+ }
+ return nil
}
func (c *ChunkCache) SetChunk(fileId string, data []byte) {
- c.cache.Set(fileId, data, time.Hour)
+ c.Lock()
+ defer c.Unlock()
+
+ c.memCache.SetChunk(fileId, data)
+
+ if len(c.diskCaches) == 0 {
+ return
+ }
+
+ if c.diskCaches[0].fileSize+int64(len(data)) > c.diskCaches[0].sizeLimit {
+ t, resetErr := c.diskCaches[len(c.diskCaches)-1].Reset()
+ if resetErr != nil {
+ glog.Errorf("failed to reset cache file %s", c.diskCaches[len(c.diskCaches)-1].fileName)
+ return
+ }
+ for i := len(c.diskCaches) - 1; i > 0; i-- {
+ c.diskCaches[i] = c.diskCaches[i-1]
+ }
+ c.diskCaches[0] = t
+ }
+
+ fid, err := needle.ParseFileIdFromString(fileId)
+ if err != nil {
+ glog.Errorf("failed to parse file id %s", fileId)
+ return
+ }
+ c.diskCaches[0].WriteNeedle(fid.Key, data)
+
}
+
+func (c *ChunkCache) Shutdown() {
+ c.Lock()
+ defer c.Unlock()
+ for _, diskCache := range c.diskCaches {
+ diskCache.Shutdown()
+ }
+} \ No newline at end of file
diff --git a/weed/util/chunk_cache/chunk_cache_in_memory.go b/weed/util/chunk_cache/chunk_cache_in_memory.go
new file mode 100644
index 000000000..931e45e9a
--- /dev/null
+++ b/weed/util/chunk_cache/chunk_cache_in_memory.go
@@ -0,0 +1,36 @@
+package chunk_cache
+
+import (
+ "time"
+
+ "github.com/karlseguin/ccache"
+)
+
+// a global cache for recently accessed file chunks
+type ChunkCacheInMemory struct {
+ cache *ccache.Cache
+}
+
+func NewChunkCacheInMemory(maxEntries int64) *ChunkCacheInMemory {
+ pruneCount := maxEntries >> 3
+ if pruneCount <= 0 {
+ pruneCount = 500
+ }
+ return &ChunkCacheInMemory{
+ cache: ccache.New(ccache.Configure().MaxSize(maxEntries).ItemsToPrune(uint32(pruneCount))),
+ }
+}
+
+func (c *ChunkCacheInMemory) GetChunk(fileId string) []byte {
+ item := c.cache.Get(fileId)
+ if item == nil {
+ return nil
+ }
+ data := item.Value().([]byte)
+ item.Extend(time.Hour)
+ return data
+}
+
+func (c *ChunkCacheInMemory) SetChunk(fileId string, data []byte) {
+ c.cache.Set(fileId, data, time.Hour)
+}
diff --git a/weed/util/chunk_cache/chunk_cache_on_disk.go b/weed/util/chunk_cache/chunk_cache_on_disk.go
new file mode 100644
index 000000000..2c7ef8d39
--- /dev/null
+++ b/weed/util/chunk_cache/chunk_cache_on_disk.go
@@ -0,0 +1,145 @@
+package chunk_cache
+
+import (
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/syndtr/goleveldb/leveldb/opt"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+// This implements an on disk cache
+// The entries are an FIFO with a size limit
+
+type ChunkCacheVolume struct {
+ DataBackend backend.BackendStorageFile
+ nm storage.NeedleMapper
+ fileName string
+ smallBuffer []byte
+ sizeLimit int64
+ lastModTime time.Time
+ fileSize int64
+}
+
+func LoadOrCreateChunkCacheVolume(fileName string, preallocate int64) (*ChunkCacheVolume, error) {
+
+ v := &ChunkCacheVolume{
+ smallBuffer: make([]byte, types.NeedlePaddingSize),
+ fileName: fileName,
+ sizeLimit: preallocate,
+ }
+
+ var err error
+
+ if exists, canRead, canWrite, modTime, fileSize := util.CheckFile(v.fileName + ".dat"); exists {
+ if !canRead {
+ return nil, fmt.Errorf("cannot read cache file %s.dat", v.fileName)
+ }
+ if !canWrite {
+ return nil, fmt.Errorf("cannot write cache file %s.dat", v.fileName)
+ }
+ if dataFile, err := os.OpenFile(v.fileName+".dat", os.O_RDWR|os.O_CREATE, 0644); err != nil {
+ return nil, fmt.Errorf("cannot create cache file %s.dat: %v", v.fileName, err)
+ } else {
+ v.DataBackend = backend.NewDiskFile(dataFile)
+ v.lastModTime = modTime
+ v.fileSize = fileSize
+ }
+ } else {
+ if v.DataBackend, err = backend.CreateVolumeFile(v.fileName+".dat", preallocate, 0); err != nil {
+ return nil, fmt.Errorf("cannot create cache file %s.dat: %v", v.fileName, err)
+ }
+ v.lastModTime = time.Now()
+ }
+
+ var indexFile *os.File
+ if indexFile, err = os.OpenFile(v.fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); err != nil {
+ return nil, fmt.Errorf("cannot write cache index %s.idx: %v", v.fileName, err)
+ }
+
+ glog.V(0).Infoln("loading leveldb", v.fileName+".ldb")
+ opts := &opt.Options{
+ BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB
+ CompactionTableSizeMultiplier: 10, // default value is 1
+ }
+ if v.nm, err = storage.NewLevelDbNeedleMap(v.fileName+".ldb", indexFile, opts); err != nil {
+ return nil, fmt.Errorf("loading leveldb %s error: %v", v.fileName+".ldb", err)
+ }
+
+ return v, nil
+
+}
+
+func (v *ChunkCacheVolume) Shutdown() {
+ if v.DataBackend != nil {
+ v.DataBackend.Close()
+ v.DataBackend = nil
+ }
+ if v.nm != nil {
+ v.nm.Close()
+ v.nm = nil
+ }
+}
+
+func (v *ChunkCacheVolume) destroy() {
+ v.Shutdown()
+ os.Remove(v.fileName + ".dat")
+ os.Remove(v.fileName + ".idx")
+ os.RemoveAll(v.fileName + ".ldb")
+}
+
+func (v *ChunkCacheVolume) Reset() (*ChunkCacheVolume, error) {
+ v.destroy()
+ return LoadOrCreateChunkCacheVolume(v.fileName, v.sizeLimit)
+}
+
+func (v *ChunkCacheVolume) GetNeedle(key types.NeedleId) ([]byte, error) {
+
+ nv, ok := v.nm.Get(key)
+ if !ok {
+ return nil, storage.ErrorNotFound
+ }
+ data := make([]byte, nv.Size)
+ if readSize, readErr := v.DataBackend.ReadAt(data, nv.Offset.ToAcutalOffset()); readErr != nil {
+ return nil, fmt.Errorf("read %s.dat [%d,%d): %v",
+ v.fileName, nv.Offset.ToAcutalOffset(), nv.Offset.ToAcutalOffset()+int64(nv.Size), readErr)
+ } else {
+ if readSize != int(nv.Size) {
+ return nil, fmt.Errorf("read %d, expected %d", readSize, nv.Size)
+ }
+ }
+
+ return data, nil
+}
+
+func (v *ChunkCacheVolume) WriteNeedle(key types.NeedleId, data []byte) error {
+
+ offset := v.fileSize
+
+ written, err := v.DataBackend.WriteAt(data, offset)
+ if err != nil {
+ return err
+ } else if written != len(data) {
+ return fmt.Errorf("partial written %d, expected %d", written, len(data))
+ }
+
+ v.fileSize += int64(written)
+ extraSize := written % types.NeedlePaddingSize
+ if extraSize != 0 {
+ v.DataBackend.WriteAt(v.smallBuffer[:types.NeedlePaddingSize-extraSize], offset+int64(written))
+ v.fileSize += int64(types.NeedlePaddingSize - extraSize)
+ }
+
+ if err := v.nm.Put(key, types.ToOffset(offset), uint32(len(data))); err != nil {
+ glog.V(4).Infof("failed to save in needle map %d: %v", key, err)
+ }
+
+ return nil
+}
diff --git a/weed/util/chunk_cache/chunk_cache_on_disk_test.go b/weed/util/chunk_cache/chunk_cache_on_disk_test.go
new file mode 100644
index 000000000..256b10139
--- /dev/null
+++ b/weed/util/chunk_cache/chunk_cache_on_disk_test.go
@@ -0,0 +1,58 @@
+package chunk_cache
+
+import (
+ "bytes"
+ "fmt"
+ "io/ioutil"
+ "math/rand"
+ "os"
+ "testing"
+)
+
+func TestOnDisk(t *testing.T) {
+
+ tmpDir, _ := ioutil.TempDir("", "c")
+ defer os.RemoveAll(tmpDir)
+
+ totalDiskSizeMb := int64(6)
+ segmentCount := 2
+
+ cache := NewChunkCache(0, tmpDir, totalDiskSizeMb, segmentCount)
+
+ writeCount := 5
+ type test_data struct {
+ data []byte
+ fileId string
+ }
+ testData := make([]*test_data, writeCount)
+ for i:=0;i<writeCount;i++{
+ buff := make([]byte, 1024*1024)
+ rand.Read(buff)
+ testData[i] = &test_data{
+ data: buff,
+ fileId: fmt.Sprintf("1,%daabbccdd", i+1),
+ }
+ cache.SetChunk(testData[i].fileId, testData[i].data)
+ }
+
+ for i:=0;i<writeCount;i++{
+ data := cache.GetChunk(testData[i].fileId)
+ if bytes.Compare(data, testData[i].data) != 0 {
+ t.Errorf("failed to write to and read from cache: %d", i)
+ }
+ }
+
+ cache.Shutdown()
+
+ cache = NewChunkCache(0, tmpDir, totalDiskSizeMb, segmentCount)
+
+ for i:=0;i<writeCount;i++{
+ data := cache.GetChunk(testData[i].fileId)
+ if bytes.Compare(data, testData[i].data) != 0 {
+ t.Errorf("failed to write to and read from cache: %d", i)
+ }
+ }
+
+ cache.Shutdown()
+
+}