aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer/filechunk_manifest.go15
-rw-r--r--weed/filer/reader_at.go13
-rw-r--r--weed/filer/reader_cache.go56
-rw-r--r--weed/util/mem/slot_pool.go23
4 files changed, 57 insertions, 50 deletions
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index 2c9dc5e74..f2f1c7f16 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -104,15 +104,6 @@ func fetchChunk(data []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, f
return retriedFetchChunkData(data, urlStrings, cipherKey, isGzipped, true, 0)
}
-func fetchChunkRange(data []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64) (int, error) {
- urlStrings, err := lookupFileIdFn(fileId)
- if err != nil {
- glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
- return 0, err
- }
- return retriedFetchChunkData(data, urlStrings, cipherKey, isGzipped, false, offset)
-}
-
func retriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) {
var shouldRetry bool
@@ -124,8 +115,10 @@ func retriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte,
urlString = url.PathEscape(urlString)
}
shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) {
- x := copy(buffer[n:], data)
- n += x
+ if n < len(buffer) {
+ x := copy(buffer[n:], data)
+ n += x
+ }
})
if !shouldRetry {
break
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index a38a0bfd5..8ee627a21 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -21,6 +21,7 @@ type ChunkReadAt struct {
fileSize int64
readerCache *ReaderCache
readerPattern *ReaderPattern
+ lastChunkFid string
}
var _ = io.ReaderAt(&ChunkReadAt{})
@@ -85,7 +86,7 @@ func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chun
return &ChunkReadAt{
chunkViews: chunkViews,
fileSize: fileSize,
- readerCache: newReaderCache(5, chunkCache, lookupFn),
+ readerCache: newReaderCache(32, chunkCache, lookupFn),
readerPattern: NewReaderPattern(),
}
}
@@ -167,12 +168,12 @@ func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, next
}
n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), chunkView.LogicOffset == 0)
- for i, nextChunk := range nextChunkViews {
- if i < 2 {
- c.readerCache.MaybeCache(nextChunk.FileId, nextChunk.CipherKey, nextChunk.IsGzipped, int(nextChunk.ChunkSize))
- } else {
- break
+ if c.lastChunkFid != "" && c.lastChunkFid != chunkView.FileId {
+ if chunkView.Offset == 0 { // start of a new chunk
+ c.readerCache.UnCache(c.lastChunkFid)
+ c.readerCache.MaybeCache(nextChunkViews)
}
}
+ c.lastChunkFid = chunkView.FileId
return
}
diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go
index 1a0dc6a31..4f2c52303 100644
--- a/weed/filer/reader_cache.go
+++ b/weed/filer/reader_cache.go
@@ -40,41 +40,33 @@ func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn
}
}
-func (rc *ReaderCache) MaybeCache(fileId string, cipherKey []byte, isGzipped bool, chunkSize int) {
- rc.Lock()
- defer rc.Unlock()
- if _, found := rc.downloaders[fileId]; found {
- return
- }
+func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) {
if rc.lookupFileIdFn == nil {
return
}
- // if too many, delete one of them?
- if len(rc.downloaders) >= rc.limit {
- oldestFid, oldestTime := "", time.Now()
- for fid, downloader := range rc.downloaders {
- if !downloader.completedTime.IsZero() {
- if downloader.completedTime.Before(oldestTime) {
- oldestFid, oldestTime = fid, downloader.completedTime
- }
- }
+ rc.Lock()
+ defer rc.Unlock()
+
+ for _, chunkView := range chunkViews {
+ if _, found := rc.downloaders[chunkView.FileId]; found {
+ continue
}
- if oldestFid != "" {
- oldDownloader := rc.downloaders[oldestFid]
- delete(rc.downloaders, oldestFid)
- oldDownloader.destroy()
- } else {
+
+ if len(rc.downloaders) >= rc.limit {
// if still no slots, return
return
}
- }
- cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, false)
- cacher.wg.Add(1)
- go cacher.startCaching()
- cacher.wg.Wait()
- rc.downloaders[fileId] = cacher
+ // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.LogicOffset)
+ // cache this chunk if not yet
+ cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), false)
+ cacher.wg.Add(1)
+ go cacher.startCaching()
+ cacher.wg.Wait()
+ rc.downloaders[chunkView.FileId] = cacher
+
+ }
return
}
@@ -108,6 +100,8 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt
}
}
+ // glog.V(4).Infof("cache1 %s", fileId)
+
cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache)
cacher.wg.Add(1)
go cacher.startCaching()
@@ -117,6 +111,16 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt
return cacher.readChunkAt(buffer, offset)
}
+func (rc *ReaderCache) UnCache(fileId string) {
+ rc.Lock()
+ defer rc.Unlock()
+ // glog.V(4).Infof("uncache %s", fileId)
+ if downloader, found := rc.downloaders[fileId]; found {
+ downloader.destroy()
+ delete(rc.downloaders, fileId)
+ }
+}
+
func (rc *ReaderCache) destroy() {
rc.Lock()
defer rc.Unlock()
diff --git a/weed/util/mem/slot_pool.go b/weed/util/mem/slot_pool.go
index 5bd759ab7..f7aa86e05 100644
--- a/weed/util/mem/slot_pool.go
+++ b/weed/util/mem/slot_pool.go
@@ -41,14 +41,23 @@ func getSlotPool(size int) *sync.Pool {
var total int64
func Allocate(size int) []byte {
- newVal := atomic.AddInt64(&total, 1)
- glog.V(4).Infof("++> %d", newVal)
- slab := *getSlotPool(size).Get().(*[]byte)
- return slab[:size]
+ pool := getSlotPool(size)
+ if pool != nil {
+
+ newVal := atomic.AddInt64(&total, 1)
+ glog.V(4).Infof("++> %d", newVal)
+
+ slab := *pool.Get().(*[]byte)
+ return slab[:size]
+ }
+ return make([]byte, size)
}
func Free(buf []byte) {
- newVal := atomic.AddInt64(&total, -1)
- glog.V(4).Infof("--> %d", newVal)
- getSlotPool(cap(buf)).Put(&buf)
+ pool := getSlotPool(cap(buf))
+ if pool != nil {
+ newVal := atomic.AddInt64(&total, -1)
+ glog.V(4).Infof("--> %d", newVal)
+ pool.Put(&buf)
+ }
}