aboutsummaryrefslogtreecommitdiff
path: root/go
diff options
context:
space:
mode:
authorchrislusf <chris.lu@gmail.com>2016-04-14 01:30:26 -0700
committerchrislusf <chris.lu@gmail.com>2016-04-14 01:30:26 -0700
commit0649d778a72d4c0c27b3f2049fdb6f6e18956a65 (patch)
treefd8f58c8639aba6038cf6bef0d382fc806a8b48c /go
parent95e0d2f1b236f97f70d51a62c3df6a937a27286d (diff)
downloadseaweedfs-0649d778a72d4c0c27b3f2049fdb6f6e18956a65.tar.xz
seaweedfs-0649d778a72d4c0c27b3f2049fdb6f6e18956a65.zip
pooling []byte
reduce the number of requests to make([]byte)
Diffstat (limited to 'go')
-rw-r--r--go/storage/needle.go11
-rw-r--r--go/storage/needle_read_write.go26
-rw-r--r--go/storage/volume.go3
-rw-r--r--go/util/bytes_pool.go127
-rw-r--r--go/util/bytes_pool_test.go41
-rw-r--r--go/weed/signal_handling.go3
-rw-r--r--go/weed/weed_server/volume_server_handlers_read.go1
-rw-r--r--go/weed/weed_server/volume_server_handlers_sync.go3
-rw-r--r--go/weed/weed_server/volume_server_handlers_write.go8
9 files changed, 216 insertions, 7 deletions
diff --git a/go/storage/needle.go b/go/storage/needle.go
index 6db99b7df..612a89fed 100644
--- a/go/storage/needle.go
+++ b/go/storage/needle.go
@@ -13,6 +13,7 @@ import (
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/images"
"github.com/chrislusf/seaweedfs/go/operation"
+ "github.com/chrislusf/seaweedfs/go/util"
)
const (
@@ -22,6 +23,14 @@ const (
MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8
)
+var (
+ BYTESPOOL *util.BytesPool
+)
+
+func init() {
+ BYTESPOOL = util.NewBytesPool()
+}
+
/*
* A Needle means a uploaded and stored file.
* Needle file size is limited to 4GB for now.
@@ -43,6 +52,8 @@ type Needle struct {
Checksum CRC `comment:"CRC32 to check integrity"`
Padding []byte `comment:"Aligned to 8 bytes"`
+
+ rawBytes []byte // underlying supporing []byte, fetched and released into a pool
}
func (n *Needle) String() (str string) {
diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go
index ccfe1d498..8d051dea3 100644
--- a/go/storage/needle_read_write.go
+++ b/go/storage/needle_read_write.go
@@ -136,15 +136,33 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
return 0, fmt.Errorf("Unsupported Version! (%d)", version)
}
-func ReadNeedleBlob(r *os.File, offset int64, size uint32) (bytes []byte, err error) {
+func ReleaseBytes(b []byte) {
+ // println("Releasing", len(b))
+ BYTESPOOL.Put(b)
+}
+
+func BorrwoBytes(size int) []byte {
+ ret := BYTESPOOL.Get(size)
+ // println("Reading", len(ret))
+ return ret
+}
+
+func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice, rawBytes []byte, err error) {
padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize)
- bytes = make([]byte, NeedleHeaderSize+size+NeedleChecksumSize+padding)
- _, err = r.ReadAt(bytes, offset)
+ readSize := NeedleHeaderSize + size + NeedleChecksumSize + padding
+ rawBytes = BorrwoBytes(int(readSize))
+ dataSlice = rawBytes[0:int(readSize)]
+ _, err = r.ReadAt(dataSlice, offset)
return
}
+func (n *Needle) ReleaseMemory() {
+ ReleaseBytes(n.rawBytes)
+}
+
func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) {
- bytes, err := ReadNeedleBlob(r, offset, size)
+ bytes, rawBytes, err := ReadNeedleBlob(r, offset, size)
+ n.rawBytes = rawBytes
if err != nil {
return err
}
diff --git a/go/storage/volume.go b/go/storage/volume.go
index 5c6b12e9b..af552a10f 100644
--- a/go/storage/volume.go
+++ b/go/storage/volume.go
@@ -159,6 +159,7 @@ func (v *Volume) isFileUnchanged(n *Needle) bool {
if ok && nv.Offset > 0 {
oldNeedle := new(Needle)
err := oldNeedle.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
+ defer oldNeedle.ReleaseMemory()
if err != nil {
glog.V(0).Infof("Failed to check updated file %v", err)
return false
@@ -288,6 +289,7 @@ func (v *Volume) readNeedle(n *Needle) (int, error) {
}
err := n.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
if err != nil {
+ n.ReleaseMemory()
return 0, err
}
bytesRead := len(n.Data)
@@ -304,6 +306,7 @@ func (v *Volume) readNeedle(n *Needle) (int, error) {
if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
return bytesRead, nil
}
+ n.ReleaseMemory()
return -1, errors.New("Not Found")
}
diff --git a/go/util/bytes_pool.go b/go/util/bytes_pool.go
new file mode 100644
index 000000000..58ed6feca
--- /dev/null
+++ b/go/util/bytes_pool.go
@@ -0,0 +1,127 @@
+package util
+
+import (
+ "bytes"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+var (
+ ChunkSizes = []int{
+ 1 << 4, // index 0, 16 bytes, inclusive
+ 1 << 6, // index 1, 64 bytes
+ 1 << 8, // index 2, 256 bytes
+ 1 << 10, // index 3, 1K bytes
+ 1 << 12, // index 4, 4K bytes
+ 1 << 14, // index 5, 16K bytes
+ 1 << 16, // index 6, 64K bytes
+ 1 << 18, // index 7, 256K bytes
+ 1 << 20, // index 8, 1M bytes
+ 1 << 22, // index 9, 4M bytes
+ 1 << 24, // index 10, 16M bytes
+ 1 << 26, // index 11, 64M bytes
+ 1 << 28, // index 12, 128M bytes
+ }
+
+ _DEBUG = false
+)
+
+type BytesPool struct {
+ chunkPools []*byteChunkPool
+}
+
+func NewBytesPool() *BytesPool {
+ var bp BytesPool
+ for _, size := range ChunkSizes {
+ bp.chunkPools = append(bp.chunkPools, newByteChunkPool(size))
+ }
+ ret := &bp
+ if _DEBUG {
+ t := time.NewTicker(10 * time.Second)
+ go func() {
+ for {
+ println("buffer:", ret.String())
+ <-t.C
+ }
+ }()
+ }
+ return ret
+}
+
+func (m *BytesPool) String() string {
+ var buf bytes.Buffer
+ for index, size := range ChunkSizes {
+ if m.chunkPools[index].count > 0 {
+ buf.WriteString(fmt.Sprintf("size:%d count:%d\n", size, m.chunkPools[index].count))
+ }
+ }
+ return buf.String()
+}
+
+func findChunkPoolIndex(size int) int {
+ if size <= 0 {
+ return -1
+ }
+ size = (size - 1) >> 4
+ ret := 0
+ for size > 0 {
+ size = size >> 2
+ ret = ret + 1
+ }
+ if ret >= len(ChunkSizes) {
+ return -1
+ }
+ return ret
+}
+
+func (m *BytesPool) Get(size int) []byte {
+ index := findChunkPoolIndex(size)
+ // println("get index:", index)
+ if index < 0 {
+ return make([]byte, size)
+ }
+ return m.chunkPools[index].Get()
+}
+
+func (m *BytesPool) Put(b []byte) {
+ index := findChunkPoolIndex(len(b))
+ // println("put index:", index)
+ if index < 0 {
+ return
+ }
+ m.chunkPools[index].Put(b)
+}
+
+// a pool of fix-sized []byte chunks. The pool size is managed by Go GC
+type byteChunkPool struct {
+ sync.Pool
+ chunkSizeLimit int
+ count int64
+}
+
+var count int
+
+func newByteChunkPool(chunkSizeLimit int) *byteChunkPool {
+ var m byteChunkPool
+ m.chunkSizeLimit = chunkSizeLimit
+ m.Pool.New = func() interface{} {
+ count++
+ // println("creating []byte size", m.chunkSizeLimit, "new", count, "count", m.count)
+ return make([]byte, m.chunkSizeLimit)
+ }
+ return &m
+}
+
+func (m *byteChunkPool) Get() []byte {
+ // println("before get size:", m.chunkSizeLimit, "count:", m.count)
+ atomic.AddInt64(&m.count, 1)
+ return m.Pool.Get().([]byte)
+}
+
+func (m *byteChunkPool) Put(b []byte) {
+ atomic.AddInt64(&m.count, -1)
+ // println("after put get size:", m.chunkSizeLimit, "count:", m.count)
+ m.Pool.Put(b)
+}
diff --git a/go/util/bytes_pool_test.go b/go/util/bytes_pool_test.go
new file mode 100644
index 000000000..3f37c16cf
--- /dev/null
+++ b/go/util/bytes_pool_test.go
@@ -0,0 +1,41 @@
+package util
+
+import (
+ "testing"
+)
+
+func TestTTLReadWrite(t *testing.T) {
+ var tests = []struct {
+ n int // input
+ expected int // expected result
+ }{
+ {0, -1},
+ {1, 0},
+ {1 << 4, 0},
+ {1 << 6, 1},
+ {1 << 8, 2},
+ {1 << 10, 3},
+ {1 << 12, 4},
+ {1 << 14, 5},
+ {1 << 16, 6},
+ {1 << 18, 7},
+ {1<<4 + 1, 1},
+ {1<<6 + 1, 2},
+ {1<<8 + 1, 3},
+ {1<<10 + 1, 4},
+ {1<<12 + 1, 5},
+ {1<<14 + 1, 6},
+ {1<<16 + 1, 7},
+ {1<<18 + 1, 8},
+ {1<<28 - 1, 12},
+ {1 << 28, 12},
+ {1<<28 + 2134, -1},
+ {1080, 4},
+ }
+ for _, tt := range tests {
+ actual := findChunkPoolIndex(tt.n)
+ if actual != tt.expected {
+ t.Errorf("findChunkPoolIndex(%d): expected %d, actual %d", tt.n, tt.expected, actual)
+ }
+ }
+}
diff --git a/go/weed/signal_handling.go b/go/weed/signal_handling.go
index 2004bb088..a8f166382 100644
--- a/go/weed/signal_handling.go
+++ b/go/weed/signal_handling.go
@@ -20,7 +20,8 @@ func OnInterrupt(fn func()) {
// syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
- syscall.SIGQUIT)
+ // syscall.SIGQUIT,
+ )
go func() {
for _ = range signalChan {
fn()
diff --git a/go/weed/weed_server/volume_server_handlers_read.go b/go/weed/weed_server/volume_server_handlers_read.go
index 52b22426f..6ce648062 100644
--- a/go/weed/weed_server/volume_server_handlers_read.go
+++ b/go/weed/weed_server/volume_server_handlers_read.go
@@ -66,6 +66,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
cookie := n.Cookie
count, e := vs.store.ReadVolumeNeedle(volumeId, n)
glog.V(4).Infoln("read bytes", count, "error", e)
+ defer n.ReleaseMemory()
if e != nil || count <= 0 {
glog.V(0).Infoln("read error:", e, r.URL.Path)
w.WriteHeader(http.StatusNotFound)
diff --git a/go/weed/weed_server/volume_server_handlers_sync.go b/go/weed/weed_server/volume_server_handlers_sync.go
index c650e5f53..cb24f7cd6 100644
--- a/go/weed/weed_server/volume_server_handlers_sync.go
+++ b/go/weed/weed_server/volume_server_handlers_sync.go
@@ -50,7 +50,8 @@ func (vs *VolumeServer) getVolumeDataContentHandler(w http.ResponseWriter, r *ht
}
offset := uint32(util.ParseUint64(r.FormValue("offset"), 0))
size := uint32(util.ParseUint64(r.FormValue("size"), 0))
- content, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size)
+ content, rawBytes, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size)
+ defer storage.ReleaseBytes(rawBytes)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return
diff --git a/go/weed/weed_server/volume_server_handlers_write.go b/go/weed/weed_server/volume_server_handlers_write.go
index 3d2afaf77..58733ea11 100644
--- a/go/weed/weed_server/volume_server_handlers_write.go
+++ b/go/weed/weed_server/volume_server_handlers_write.go
@@ -55,7 +55,9 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
cookie := n.Cookie
- if _, ok := vs.store.ReadVolumeNeedle(volumeId, n); ok != nil {
+ _, ok := vs.store.ReadVolumeNeedle(volumeId, n)
+ defer n.ReleaseMemory()
+ if ok != nil {
m := make(map[string]uint32)
m["size"] = 0
writeJsonQuiet(w, r, http.StatusNotFound, m)
@@ -120,6 +122,7 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques
Status: http.StatusNotFound,
Error: err.Error(),
})
+ n.ReleaseMemory()
continue
}
@@ -129,6 +132,7 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques
Status: http.StatusNotAcceptable,
Error: "ChunkManifest: not allowed in batch delete mode.",
})
+ n.ReleaseMemory()
continue
}
@@ -139,6 +143,7 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques
Error: "File Random Cookie does not match.",
})
glog.V(0).Infoln("deleting", fid, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
+ n.ReleaseMemory()
return
}
if size, err := vs.store.Delete(volumeId, n); err != nil {
@@ -154,6 +159,7 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques
Size: int(size)},
)
}
+ n.ReleaseMemory()
}
writeJsonQuiet(w, r, http.StatusAccepted, ret)