aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Schmidt <patrick.schmidt@innogames.com>2022-08-21 20:54:02 +0200
committerGitHub <noreply@github.com>2022-08-21 11:54:02 -0700
commitf49a9297c2d140cd8a8ba3e80842ad64db9be29d (patch)
tree123a59a79ffaa994708f9cd03fb9ecfb43dbdb4f
parent388f82f322fcadfeb91ccaf14109162f1641888a (diff)
downloadseaweedfs-f49a9297c2d140cd8a8ba3e80842ad64db9be29d.tar.xz
seaweedfs-f49a9297c2d140cd8a8ba3e80842ad64db9be29d.zip
Fix hanging reads in chunk cacher (#3473)
Sometimes when an unexpected error occurs the cacher would set an error and return. However, it would not broadcast the condition signal in that case, therefore leaving the goroutine that runs readChunkAt stuck forever. I figured that the condition is unnecessary because readChunkAt is acquiring a lock that is still held by the cacher goroutine anyway. Callees of startCaching have to wait for a WaitGroup which makes sure that readChunkAt can't acquire the lock before startCaching. This way readChunkAt can execute normally and check for the error.
-rw-r--r--weed/filer/reader_cache.go67
1 files changed, 34 insertions, 33 deletions
diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go
index bb7f4c87e..b409dbf61 100644
--- a/weed/filer/reader_cache.go
+++ b/weed/filer/reader_cache.go
@@ -2,11 +2,12 @@ package filer
import (
"fmt"
+ "sync"
+ "time"
+
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
- "sync"
- "time"
)
type ReaderCache struct {
@@ -19,17 +20,17 @@ type ReaderCache struct {
type SingleChunkCacher struct {
sync.Mutex
- cond *sync.Cond
- parent *ReaderCache
- chunkFileId string
- data []byte
- err error
- cipherKey []byte
- isGzipped bool
- chunkSize int
- shouldCache bool
- wg sync.WaitGroup
- completedTime time.Time
+ parent *ReaderCache
+ chunkFileId string
+ data []byte
+ err error
+ cipherKey []byte
+ isGzipped bool
+ chunkSize int
+ shouldCache bool
+ wg sync.WaitGroup
+ cacheStartedCh chan struct{}
+ completedTime time.Time
}
func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
@@ -62,9 +63,8 @@ func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) {
// glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.LogicOffset)
// cache this chunk if not yet
cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), false)
- cacher.wg.Add(1)
go cacher.startCaching()
- cacher.wg.Wait()
+ <-cacher.cacheStartedCh
rc.downloaders[chunkView.FileId] = cacher
}
@@ -87,6 +87,7 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt
}
}
+ // clean up old downloaders
if len(rc.downloaders) >= rc.limit {
oldestFid, oldestTime := "", time.Now()
for fid, downloader := range rc.downloaders {
@@ -106,9 +107,8 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt
// glog.V(4).Infof("cache1 %s", fileId)
cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache)
- cacher.wg.Add(1)
go cacher.startCaching()
- cacher.wg.Wait()
+ <-cacher.cacheStartedCh
rc.downloaders[fileId] = cacher
return cacher.readChunkAt(buffer, offset)
@@ -135,23 +135,24 @@ func (rc *ReaderCache) destroy() {
}
func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher {
- t := &SingleChunkCacher{
- parent: parent,
- chunkFileId: fileId,
- cipherKey: cipherKey,
- isGzipped: isGzipped,
- chunkSize: chunkSize,
- shouldCache: shouldCache,
+ return &SingleChunkCacher{
+ parent: parent,
+ chunkFileId: fileId,
+ cipherKey: cipherKey,
+ isGzipped: isGzipped,
+ chunkSize: chunkSize,
+ shouldCache: shouldCache,
+ cacheStartedCh: make(chan struct{}),
}
- t.cond = sync.NewCond(t)
- return t
}
func (s *SingleChunkCacher) startCaching() {
+ s.wg.Add(1)
+ defer s.wg.Done()
s.Lock()
defer s.Unlock()
- s.wg.Done() // means this has been started
+ s.cacheStartedCh <- struct{}{} // means this has been started
urlStrings, err := s.parent.lookupFileIdFn(s.chunkFileId)
if err != nil {
@@ -168,16 +169,17 @@ func (s *SingleChunkCacher) startCaching() {
return
}
- s.completedTime = time.Now()
if s.shouldCache {
s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
}
- s.cond.Broadcast()
+ s.completedTime = time.Now()
return
}
func (s *SingleChunkCacher) destroy() {
+ // wait for all reads to finish before destroying the data
+ s.wg.Wait()
s.Lock()
defer s.Unlock()
@@ -185,16 +187,15 @@ func (s *SingleChunkCacher) destroy() {
mem.Free(s.data)
s.data = nil
}
+ close(s.cacheStartedCh)
}
func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
+ s.wg.Add(1)
+ defer s.wg.Done()
s.Lock()
defer s.Unlock()
- for s.completedTime.IsZero() {
- s.cond.Wait()
- }
-
if s.err != nil {
return 0, s.err
}