aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2017-06-09 22:52:24 -0700
committerGitHub <noreply@github.com>2017-06-09 22:52:24 -0700
commit761c0eb1ed6cd7645d747726292ff2656d0d4671 (patch)
tree7a7843d226d1aa68715b0a7d5812715dc715613c
parentf5bed84340b8b96c8b134b849f302120d85708c6 (diff)
parent5047bdb4a20f756f5d025be0788403dbb2db9523 (diff)
downloadseaweedfs-761c0eb1ed6cd7645d747726292ff2656d0d4671.tar.xz
seaweedfs-761c0eb1ed6cd7645d747726292ff2656d0d4671.zip
Merge pull request #508 from chrislusf/skip_memory_pool
skip bytes cache
-rw-r--r--weed/command/server.go1
-rw-r--r--weed/command/volume.go3
-rw-r--r--weed/server/volume_server.go4
-rw-r--r--weed/server/volume_server_handlers_read.go1
-rw-r--r--weed/server/volume_server_handlers_sync.go5
-rw-r--r--weed/server/volume_server_handlers_write.go4
-rw-r--r--weed/storage/needle.go2
-rw-r--r--weed/storage/needle_byte_cache.go75
-rw-r--r--weed/storage/needle_read_write.go5
-rw-r--r--weed/storage/volume_read_write.go2
-rw-r--r--weed/storage/volume_vacuum.go3
-rw-r--r--weed/util/bytes_pool.go127
-rw-r--r--weed/util/bytes_pool_test.go41
13 files changed, 9 insertions, 264 deletions
diff --git a/weed/command/server.go b/weed/command/server.go
index 0c5070981..c8878e9eb 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -266,7 +266,6 @@ func runServer(cmd *Command, args []string) bool {
volumeNeedleMapKind,
*serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack,
serverWhiteList, *volumeFixJpgOrientation, *volumeReadRedirect,
- *volumeEnableBytesCache,
)
glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*volumePort))
diff --git a/weed/command/volume.go b/weed/command/volume.go
index a4e316ecb..ad9803974 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -36,7 +36,6 @@ type VolumeServerOptions struct {
indexType *string
fixJpgOrientation *bool
readRedirect *bool
- enableBytesCache *bool
}
func init() {
@@ -55,7 +54,6 @@ func init() {
v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|boltdb|btree] mode for memory~performance balance.")
v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", true, "Adjust jpg orientation when uploading.")
v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.")
- v.enableBytesCache = cmdVolume.Flag.Bool("cache.enable", false, "direct cache instead of OS cache, cost more memory.")
}
var cmdVolume = &Command{
@@ -136,7 +134,6 @@ func runVolume(cmd *Command, args []string) bool {
*v.master, *v.pulseSeconds, *v.dataCenter, *v.rack,
v.whiteList,
*v.fixJpgOrientation, *v.readRedirect,
- *v.enableBytesCache,
)
listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port)
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index cc06f0092..cace8d181 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -32,8 +32,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
dataCenter string, rack string,
whiteList []string,
fixJpgOrientation bool,
- readRedirect bool,
- enableBytesCache bool) *VolumeServer {
+ readRedirect bool) *VolumeServer {
vs := &VolumeServer{
pulseSeconds: pulseSeconds,
dataCenter: dataCenter,
@@ -44,7 +43,6 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
}
vs.SetMasterNode(masterNode)
vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind)
- storage.EnableBytesCache = enableBytesCache
vs.guard = security.NewGuard(whiteList, "")
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index 8f50b265e..9b0fee4eb 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -72,7 +72,6 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
w.WriteHeader(http.StatusNotFound)
return
}
- defer n.ReleaseMemory()
if n.Cookie != cookie {
glog.V(0).Infoln("request", r.URL.Path, "with unmaching cookie seen:", cookie, "expected:", n.Cookie, "from", r.RemoteAddr, "agent", r.UserAgent())
w.WriteHeader(http.StatusNotFound)
diff --git a/weed/server/volume_server_handlers_sync.go b/weed/server/volume_server_handlers_sync.go
index 68c381e28..df1fde590 100644
--- a/weed/server/volume_server_handlers_sync.go
+++ b/weed/server/volume_server_handlers_sync.go
@@ -50,8 +50,7 @@ func (vs *VolumeServer) getVolumeDataContentHandler(w http.ResponseWriter, r *ht
}
offset := uint32(util.ParseUint64(r.FormValue("offset"), 0))
size := uint32(util.ParseUint64(r.FormValue("size"), 0))
- content, block, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size)
- defer storage.ReleaseBytes(block.Bytes)
+ content, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return
@@ -83,4 +82,4 @@ func (vs *VolumeServer) getVolumeId(volumeParameterName string, r *http.Request)
}
return vid, err
-} \ No newline at end of file
+}
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index b02a58fc8..e45c2245c 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -64,7 +64,6 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
writeJsonQuiet(w, r, http.StatusNotFound, m)
return
}
- defer n.ReleaseMemory()
if n.Cookie != cookie {
glog.V(0).Infoln("delete", r.URL.Path, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
@@ -133,7 +132,6 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques
Status: http.StatusNotAcceptable,
Error: "ChunkManifest: not allowed in batch delete mode.",
})
- n.ReleaseMemory()
continue
}
@@ -144,7 +142,6 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques
Error: "File Random Cookie does not match.",
})
glog.V(0).Infoln("deleting", fid, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
- n.ReleaseMemory()
return
}
if size, err := vs.store.Delete(volumeId, n); err != nil {
@@ -160,7 +157,6 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques
Size: int(size)},
)
}
- n.ReleaseMemory()
}
writeJsonQuiet(w, r, http.StatusAccepted, ret)
diff --git a/weed/storage/needle.go b/weed/storage/needle.go
index 29e70ff10..2ffaff4de 100644
--- a/weed/storage/needle.go
+++ b/weed/storage/needle.go
@@ -49,8 +49,6 @@ type Needle struct {
Checksum CRC `comment:"CRC32 to check integrity"`
Padding []byte `comment:"Aligned to 8 bytes"`
-
- rawBlock *Block // underlying supporing []byte, fetched and released into a pool
}
func (n *Needle) String() (str string) {
diff --git a/weed/storage/needle_byte_cache.go b/weed/storage/needle_byte_cache.go
index dfc32bcbf..78c1ea862 100644
--- a/weed/storage/needle_byte_cache.go
+++ b/weed/storage/needle_byte_cache.go
@@ -1,80 +1,11 @@
package storage
import (
- "fmt"
"os"
- "sync/atomic"
-
- "github.com/hashicorp/golang-lru"
-
- "github.com/chrislusf/seaweedfs/weed/util"
)
-var (
- EnableBytesCache = true
- bytesCache *lru.Cache
- bytesPool *util.BytesPool
-)
-
-/*
-There are one level of caching, and one level of pooling.
-
-In pooling, all []byte are fetched and returned to the pool bytesPool.
-
-In caching, the string~[]byte mapping is cached
-*/
-func init() {
- bytesPool = util.NewBytesPool()
- bytesCache, _ = lru.NewWithEvict(512, func(key interface{}, value interface{}) {
- value.(*Block).decreaseReference()
- })
-}
-
-type Block struct {
- Bytes []byte
- refCount int32
-}
-
-func (block *Block) decreaseReference() {
- if atomic.AddInt32(&block.refCount, -1) == 0 {
- bytesPool.Put(block.Bytes)
- }
-}
-func (block *Block) increaseReference() {
- atomic.AddInt32(&block.refCount, 1)
-}
-
-// get bytes from the LRU cache of []byte first, then from the bytes pool
-// when []byte in LRU cache is evicted, it will be put back to the bytes pool
-func getBytesForFileBlock(r *os.File, offset int64, readSize int) (dataSlice []byte, block *Block, err error) {
- // check cache, return if found
- cacheKey := fmt.Sprintf("%d:%d:%d", r.Fd(), offset>>3, readSize)
- if EnableBytesCache {
- if obj, found := bytesCache.Get(cacheKey); found {
- block = obj.(*Block)
- block.increaseReference()
- dataSlice = block.Bytes[0:readSize]
- return dataSlice, block, nil
- }
- }
-
- // get the []byte from pool
- b := bytesPool.Get(readSize)
- // refCount = 2, one by the bytesCache, one by the actual needle object
- block = &Block{Bytes: b, refCount: 2}
- dataSlice = block.Bytes[0:readSize]
+func getBytesForFileBlock(r *os.File, offset int64, readSize int) (dataSlice []byte, err error) {
+ dataSlice = make([]byte, readSize)
_, err = r.ReadAt(dataSlice, offset)
- if EnableBytesCache {
- bytesCache.Add(cacheKey, block)
- }
- return dataSlice, block, err
-}
-
-func (n *Needle) ReleaseMemory() {
- if n.rawBlock != nil {
- n.rawBlock.decreaseReference()
- }
-}
-func ReleaseBytes(b []byte) {
- bytesPool.Put(b)
+ return dataSlice, err
}
diff --git a/weed/storage/needle_read_write.go b/weed/storage/needle_read_write.go
index 4f03ce396..ee7cc6046 100644
--- a/weed/storage/needle_read_write.go
+++ b/weed/storage/needle_read_write.go
@@ -151,16 +151,15 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i
return 0, 0, fmt.Errorf("Unsupported Version! (%d)", version)
}
-func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, block *Block, err error) {
+func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, err error) {
return getBytesForFileBlock(r, offset, int(getActualSize(size)))
}
func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) {
- bytes, block, err := ReadNeedleBlob(r, offset, size)
+ bytes, err := ReadNeedleBlob(r, offset, size)
if err != nil {
return err
}
- n.rawBlock = block
n.ParseNeedleHeader(bytes)
if n.Size != size {
return fmt.Errorf("File Entry Not Found. Needle %d Memory %d", n.Size, size)
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index 2314bc815..16d8b6d04 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -25,7 +25,6 @@ func (v *Volume) isFileUnchanged(n *Needle) bool {
glog.V(0).Infof("Failed to check updated file %v", err)
return false
}
- defer oldNeedle.ReleaseMemory()
if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
n.DataSize = oldNeedle.DataSize
return true
@@ -172,7 +171,6 @@ func (v *Volume) readNeedle(n *Needle) (int, error) {
if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
return bytesRead, nil
}
- n.ReleaseMemory()
return -1, errors.New("Not Found")
}
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index f6f68d59b..22c117c41 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -178,7 +178,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
if incre_idx_entry.offset != 0 && incre_idx_entry.size != 0 {
//even the needle cache in memory is hit, the need_bytes is correct
var needle_bytes []byte
- needle_bytes, _, err = ReadNeedleBlob(oldDatFile, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size)
+ needle_bytes, err = ReadNeedleBlob(oldDatFile, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size)
if err != nil {
return
}
@@ -291,7 +291,6 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
n := new(Needle)
n.ReadData(v.dataFile, int64(offset)*NeedlePaddingSize, size, v.Version())
- defer n.ReleaseMemory()
if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) {
return nil
diff --git a/weed/util/bytes_pool.go b/weed/util/bytes_pool.go
deleted file mode 100644
index 58ed6feca..000000000
--- a/weed/util/bytes_pool.go
+++ /dev/null
@@ -1,127 +0,0 @@
-package util
-
-import (
- "bytes"
- "fmt"
- "sync"
- "sync/atomic"
- "time"
-)
-
-var (
- ChunkSizes = []int{
- 1 << 4, // index 0, 16 bytes, inclusive
- 1 << 6, // index 1, 64 bytes
- 1 << 8, // index 2, 256 bytes
- 1 << 10, // index 3, 1K bytes
- 1 << 12, // index 4, 4K bytes
- 1 << 14, // index 5, 16K bytes
- 1 << 16, // index 6, 64K bytes
- 1 << 18, // index 7, 256K bytes
- 1 << 20, // index 8, 1M bytes
- 1 << 22, // index 9, 4M bytes
- 1 << 24, // index 10, 16M bytes
- 1 << 26, // index 11, 64M bytes
- 1 << 28, // index 12, 128M bytes
- }
-
- _DEBUG = false
-)
-
-type BytesPool struct {
- chunkPools []*byteChunkPool
-}
-
-func NewBytesPool() *BytesPool {
- var bp BytesPool
- for _, size := range ChunkSizes {
- bp.chunkPools = append(bp.chunkPools, newByteChunkPool(size))
- }
- ret := &bp
- if _DEBUG {
- t := time.NewTicker(10 * time.Second)
- go func() {
- for {
- println("buffer:", ret.String())
- <-t.C
- }
- }()
- }
- return ret
-}
-
-func (m *BytesPool) String() string {
- var buf bytes.Buffer
- for index, size := range ChunkSizes {
- if m.chunkPools[index].count > 0 {
- buf.WriteString(fmt.Sprintf("size:%d count:%d\n", size, m.chunkPools[index].count))
- }
- }
- return buf.String()
-}
-
-func findChunkPoolIndex(size int) int {
- if size <= 0 {
- return -1
- }
- size = (size - 1) >> 4
- ret := 0
- for size > 0 {
- size = size >> 2
- ret = ret + 1
- }
- if ret >= len(ChunkSizes) {
- return -1
- }
- return ret
-}
-
-func (m *BytesPool) Get(size int) []byte {
- index := findChunkPoolIndex(size)
- // println("get index:", index)
- if index < 0 {
- return make([]byte, size)
- }
- return m.chunkPools[index].Get()
-}
-
-func (m *BytesPool) Put(b []byte) {
- index := findChunkPoolIndex(len(b))
- // println("put index:", index)
- if index < 0 {
- return
- }
- m.chunkPools[index].Put(b)
-}
-
-// a pool of fix-sized []byte chunks. The pool size is managed by Go GC
-type byteChunkPool struct {
- sync.Pool
- chunkSizeLimit int
- count int64
-}
-
-var count int
-
-func newByteChunkPool(chunkSizeLimit int) *byteChunkPool {
- var m byteChunkPool
- m.chunkSizeLimit = chunkSizeLimit
- m.Pool.New = func() interface{} {
- count++
- // println("creating []byte size", m.chunkSizeLimit, "new", count, "count", m.count)
- return make([]byte, m.chunkSizeLimit)
- }
- return &m
-}
-
-func (m *byteChunkPool) Get() []byte {
- // println("before get size:", m.chunkSizeLimit, "count:", m.count)
- atomic.AddInt64(&m.count, 1)
- return m.Pool.Get().([]byte)
-}
-
-func (m *byteChunkPool) Put(b []byte) {
- atomic.AddInt64(&m.count, -1)
- // println("after put get size:", m.chunkSizeLimit, "count:", m.count)
- m.Pool.Put(b)
-}
diff --git a/weed/util/bytes_pool_test.go b/weed/util/bytes_pool_test.go
deleted file mode 100644
index 3f37c16cf..000000000
--- a/weed/util/bytes_pool_test.go
+++ /dev/null
@@ -1,41 +0,0 @@
-package util
-
-import (
- "testing"
-)
-
-func TestTTLReadWrite(t *testing.T) {
- var tests = []struct {
- n int // input
- expected int // expected result
- }{
- {0, -1},
- {1, 0},
- {1 << 4, 0},
- {1 << 6, 1},
- {1 << 8, 2},
- {1 << 10, 3},
- {1 << 12, 4},
- {1 << 14, 5},
- {1 << 16, 6},
- {1 << 18, 7},
- {1<<4 + 1, 1},
- {1<<6 + 1, 2},
- {1<<8 + 1, 3},
- {1<<10 + 1, 4},
- {1<<12 + 1, 5},
- {1<<14 + 1, 6},
- {1<<16 + 1, 7},
- {1<<18 + 1, 8},
- {1<<28 - 1, 12},
- {1 << 28, 12},
- {1<<28 + 2134, -1},
- {1080, 4},
- }
- for _, tt := range tests {
- actual := findChunkPoolIndex(tt.n)
- if actual != tt.expected {
- t.Errorf("findChunkPoolIndex(%d): expected %d, actual %d", tt.n, tt.expected, actual)
- }
- }
-}