aboutsummaryrefslogtreecommitdiff
path: root/go
diff options
context:
space:
mode:
authorchrislusf <chris.lu@gmail.com>2016-04-15 11:56:53 -0700
committerchrislusf <chris.lu@gmail.com>2016-04-15 11:56:53 -0700
commitb03e7b26b53c2c579ae13755c3fac47f67c6f40c (patch)
treee02bc29295ee90f5a9f33065175baa54353ed55b /go
parent3fb98a904bf5d67e7f51797fbee0aa7ddffb5903 (diff)
downloadseaweedfs-b03e7b26b53c2c579ae13755c3fac47f67c6f40c.tar.xz
seaweedfs-b03e7b26b53c2c579ae13755c3fac47f67c6f40c.zip
add []byte caching and pooling
fixes https://github.com/chrislusf/seaweedfs/issues/211
Diffstat (limited to 'go')
-rw-r--r--go/storage/needle.go11
-rw-r--r--go/storage/needle_byte_cache.go70
-rw-r--r--go/storage/needle_read_write.go31
-rw-r--r--go/weed/weed_server/volume_server_handlers_sync.go4
4 files changed, 82 insertions, 34 deletions
diff --git a/go/storage/needle.go b/go/storage/needle.go
index 612a89fed..8ab76c0f3 100644
--- a/go/storage/needle.go
+++ b/go/storage/needle.go
@@ -13,7 +13,6 @@ import (
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/images"
"github.com/chrislusf/seaweedfs/go/operation"
- "github.com/chrislusf/seaweedfs/go/util"
)
const (
@@ -23,14 +22,6 @@ const (
MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8
)
-var (
- BYTESPOOL *util.BytesPool
-)
-
-func init() {
- BYTESPOOL = util.NewBytesPool()
-}
-
/*
* A Needle means a uploaded and stored file.
* Needle file size is limited to 4GB for now.
@@ -53,7 +44,7 @@ type Needle struct {
Checksum CRC `comment:"CRC32 to check integrity"`
Padding []byte `comment:"Aligned to 8 bytes"`
- rawBytes []byte // underlying supporing []byte, fetched and released into a pool
+ rawBlock *Block // underlying supporing []byte, fetched and released into a pool
}
func (n *Needle) String() (str string) {
diff --git a/go/storage/needle_byte_cache.go b/go/storage/needle_byte_cache.go
new file mode 100644
index 000000000..c7781917e
--- /dev/null
+++ b/go/storage/needle_byte_cache.go
@@ -0,0 +1,70 @@
+package storage
+
+import (
+ "fmt"
+ "os"
+ "sync/atomic"
+
+ "github.com/hashicorp/golang-lru"
+
+ "github.com/chrislusf/seaweedfs/go/util"
+)
+
+var (
+ bytesCache *lru.Cache
+ bytesPool *util.BytesPool
+)
+
+/*
+There are one level of caching, and one level of pooling.
+
+In pooling, all []byte are fetched and returned to the pool bytesPool.
+
+In caching, the string~[]byte mapping is cached, to
+*/
+func init() {
+ bytesPool = util.NewBytesPool()
+ bytesCache, _ = lru.NewWithEvict(1, func(key interface{}, value interface{}) {
+ value.(*Block).decreaseReference()
+ })
+}
+
+type Block struct {
+ Bytes []byte
+ refCount int32
+}
+
+func (block *Block) decreaseReference() {
+ if atomic.AddInt32(&block.refCount, -1) == 0 {
+ bytesPool.Put(block.Bytes)
+ }
+}
+func (block *Block) increaseReference() {
+ atomic.AddInt32(&block.refCount, 1)
+}
+
+// get bytes from the LRU cache of []byte first, then from the bytes pool
+// when []byte in LRU cache is evicted, it will be put back to the bytes pool
+func getBytesForFileBlock(r *os.File, offset int64, readSize int) (block *Block, isNew bool) {
+ // check cache, return if found
+ cacheKey := fmt.Sprintf("%d:%d:%d", r.Fd(), offset>>3, readSize)
+ if obj, found := bytesCache.Get(cacheKey); found {
+ block = obj.(*Block)
+ block.increaseReference()
+ return block, false
+ }
+
+ // get the []byte from pool
+ b := bytesPool.Get(readSize)
+ // refCount = 2, one by the bytesCache, one by the actual needle object
+ block = &Block{Bytes: b, refCount: 2}
+ bytesCache.Add(cacheKey, block)
+ return block, true
+}
+
+func (n *Needle) ReleaseMemory() {
+ n.rawBlock.decreaseReference()
+}
+func ReleaseBytes(b []byte) {
+ bytesPool.Put(b)
+}
diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go
index 8d051dea3..e2be256be 100644
--- a/go/storage/needle_read_write.go
+++ b/go/storage/needle_read_write.go
@@ -136,33 +136,20 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
return 0, fmt.Errorf("Unsupported Version! (%d)", version)
}
-func ReleaseBytes(b []byte) {
- // println("Releasing", len(b))
- BYTESPOOL.Put(b)
-}
-
-func BorrwoBytes(size int) []byte {
- ret := BYTESPOOL.Get(size)
- // println("Reading", len(ret))
- return ret
-}
-
-func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice, rawBytes []byte, err error) {
+func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, block *Block, err error) {
padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize)
readSize := NeedleHeaderSize + size + NeedleChecksumSize + padding
- rawBytes = BorrwoBytes(int(readSize))
- dataSlice = rawBytes[0:int(readSize)]
- _, err = r.ReadAt(dataSlice, offset)
- return
-}
-
-func (n *Needle) ReleaseMemory() {
- ReleaseBytes(n.rawBytes)
+ block, isNew := getBytesForFileBlock(r, offset, int(readSize))
+ dataSlice = block.Bytes[0:int(readSize)]
+ if isNew {
+ _, err = r.ReadAt(dataSlice, offset)
+ }
+ return dataSlice, block, err
}
func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) {
- bytes, rawBytes, err := ReadNeedleBlob(r, offset, size)
- n.rawBytes = rawBytes
+ bytes, block, err := ReadNeedleBlob(r, offset, size)
+ n.rawBlock = block
if err != nil {
return err
}
diff --git a/go/weed/weed_server/volume_server_handlers_sync.go b/go/weed/weed_server/volume_server_handlers_sync.go
index cb24f7cd6..c52c93bd2 100644
--- a/go/weed/weed_server/volume_server_handlers_sync.go
+++ b/go/weed/weed_server/volume_server_handlers_sync.go
@@ -50,8 +50,8 @@ func (vs *VolumeServer) getVolumeDataContentHandler(w http.ResponseWriter, r *ht
}
offset := uint32(util.ParseUint64(r.FormValue("offset"), 0))
size := uint32(util.ParseUint64(r.FormValue("size"), 0))
- content, rawBytes, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size)
- defer storage.ReleaseBytes(rawBytes)
+ content, block, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size)
+ defer storage.ReleaseBytes(block.Bytes)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return