aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer/reader_cache.go67
1 files changed, 34 insertions, 33 deletions
diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go
index bb7f4c87e..b409dbf61 100644
--- a/weed/filer/reader_cache.go
+++ b/weed/filer/reader_cache.go
@@ -2,11 +2,12 @@ package filer
import (
"fmt"
+ "sync"
+ "time"
+
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
- "sync"
- "time"
)
type ReaderCache struct {
@@ -19,17 +20,17 @@ type ReaderCache struct {
type SingleChunkCacher struct {
sync.Mutex
- cond *sync.Cond
- parent *ReaderCache
- chunkFileId string
- data []byte
- err error
- cipherKey []byte
- isGzipped bool
- chunkSize int
- shouldCache bool
- wg sync.WaitGroup
- completedTime time.Time
+ parent *ReaderCache
+ chunkFileId string
+ data []byte
+ err error
+ cipherKey []byte
+ isGzipped bool
+ chunkSize int
+ shouldCache bool
+ wg sync.WaitGroup
+ cacheStartedCh chan struct{}
+ completedTime time.Time
}
func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
@@ -62,9 +63,8 @@ func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) {
// glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.LogicOffset)
// cache this chunk if not yet
cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), false)
- cacher.wg.Add(1)
go cacher.startCaching()
- cacher.wg.Wait()
+ <-cacher.cacheStartedCh
rc.downloaders[chunkView.FileId] = cacher
}
@@ -87,6 +87,7 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt
}
}
+ // clean up old downloaders
if len(rc.downloaders) >= rc.limit {
oldestFid, oldestTime := "", time.Now()
for fid, downloader := range rc.downloaders {
@@ -106,9 +107,8 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt
// glog.V(4).Infof("cache1 %s", fileId)
cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache)
- cacher.wg.Add(1)
go cacher.startCaching()
- cacher.wg.Wait()
+ <-cacher.cacheStartedCh
rc.downloaders[fileId] = cacher
return cacher.readChunkAt(buffer, offset)
@@ -135,23 +135,24 @@ func (rc *ReaderCache) destroy() {
}
func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher {
- t := &SingleChunkCacher{
- parent: parent,
- chunkFileId: fileId,
- cipherKey: cipherKey,
- isGzipped: isGzipped,
- chunkSize: chunkSize,
- shouldCache: shouldCache,
+ return &SingleChunkCacher{
+ parent: parent,
+ chunkFileId: fileId,
+ cipherKey: cipherKey,
+ isGzipped: isGzipped,
+ chunkSize: chunkSize,
+ shouldCache: shouldCache,
+ cacheStartedCh: make(chan struct{}),
}
- t.cond = sync.NewCond(t)
- return t
}
func (s *SingleChunkCacher) startCaching() {
+ s.wg.Add(1)
+ defer s.wg.Done()
s.Lock()
defer s.Unlock()
- s.wg.Done() // means this has been started
+ s.cacheStartedCh <- struct{}{} // means this has been started
urlStrings, err := s.parent.lookupFileIdFn(s.chunkFileId)
if err != nil {
@@ -168,16 +169,17 @@ func (s *SingleChunkCacher) startCaching() {
return
}
- s.completedTime = time.Now()
if s.shouldCache {
s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
}
- s.cond.Broadcast()
+ s.completedTime = time.Now()
return
}
func (s *SingleChunkCacher) destroy() {
+ // wait for all reads to finish before destroying the data
+ s.wg.Wait()
s.Lock()
defer s.Unlock()
@@ -185,16 +187,15 @@ func (s *SingleChunkCacher) destroy() {
mem.Free(s.data)
s.data = nil
}
+ close(s.cacheStartedCh)
}
func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
+ s.wg.Add(1)
+ defer s.wg.Done()
s.Lock()
defer s.Unlock()
- for s.completedTime.IsZero() {
- s.cond.Wait()
- }
-
if s.err != nil {
return 0, s.err
}