aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-08-26 16:49:51 -0700
committerchrislu <chris.lu@gmail.com>2022-08-26 16:49:51 -0700
commit8dae81c5edaea2b25a79d49c4a08ffadfdc6fddc (patch)
treece1ef0be521398d06255a97eb531d97b7f940b47
parente0f4366f4ca3259bfe582fb1dd8951f2005160ab (diff)
parent5df105b1f94b8776d18159ae213da39299e2ea37 (diff)
downloadseaweedfs-8dae81c5edaea2b25a79d49c4a08ffadfdc6fddc.tar.xz
seaweedfs-8dae81c5edaea2b25a79d49c4a08ffadfdc6fddc.zip
Merge branch 'master' of https://github.com/seaweedfs/seaweedfs
-rw-r--r--weed/filer/reader_cache.go45
-rw-r--r--weed/filer/reader_pattern.go20
-rw-r--r--weed/mount/filehandle.go14
-rw-r--r--weed/mount/filehandle_map.go7
-rw-r--r--weed/mount/weedfs.go9
-rw-r--r--weed/mount/weedfs_dir_lookup.go12
6 files changed, 70 insertions, 37 deletions
diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go
index eb2308758..89db04eb0 100644
--- a/weed/filer/reader_cache.go
+++ b/weed/filer/reader_cache.go
@@ -3,6 +3,7 @@ package filer
import (
"fmt"
"sync"
+ "sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
@@ -20,17 +21,17 @@ type ReaderCache struct {
type SingleChunkCacher struct {
sync.Mutex
- parent *ReaderCache
- chunkFileId string
- data []byte
- err error
- cipherKey []byte
- isGzipped bool
- chunkSize int
- shouldCache bool
- wg sync.WaitGroup
- cacheStartedCh chan struct{}
- completedTime time.Time
+ parent *ReaderCache
+ chunkFileId string
+ data []byte
+ err error
+ cipherKey []byte
+ isGzipped bool
+ chunkSize int
+ shouldCache bool
+ wg sync.WaitGroup
+ cacheStartedCh chan struct{}
+ completedTimeNew int64
}
func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
@@ -50,13 +51,17 @@ func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) {
rc.Lock()
defer rc.Unlock()
+ if len(rc.downloaders) >= rc.limit {
+ return
+ }
+
for _, chunkView := range chunkViews {
if _, found := rc.downloaders[chunkView.FileId]; found {
continue
}
if len(rc.downloaders) >= rc.limit {
- // if still no slots, return
+ // abort when slots are filled
return
}
@@ -74,27 +79,28 @@ func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) {
func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
rc.Lock()
- defer rc.Unlock()
+
if cacher, found := rc.downloaders[fileId]; found {
if n, err := cacher.readChunkAt(buffer, offset); n != 0 && err == nil {
+ rc.Unlock()
return n, err
}
}
if shouldCache || rc.lookupFileIdFn == nil {
n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
if n > 0 {
+ rc.Unlock()
return n, err
}
}
// clean up old downloaders
if len(rc.downloaders) >= rc.limit {
- oldestFid, oldestTime := "", time.Now()
+ oldestFid, oldestTime := "", time.Now().Unix()
for fid, downloader := range rc.downloaders {
- if !downloader.completedTime.IsZero() {
- if downloader.completedTime.Before(oldestTime) {
- oldestFid, oldestTime = fid, downloader.completedTime
- }
+ completedTime := atomic.LoadInt64(&downloader.completedTimeNew)
+ if completedTime > 0 && completedTime < oldestTime {
+ oldestFid, oldestTime = fid, completedTime
}
}
if oldestFid != "" {
@@ -110,6 +116,7 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt
go cacher.startCaching()
<-cacher.cacheStartedCh
rc.downloaders[fileId] = cacher
+ rc.Unlock()
return cacher.readChunkAt(buffer, offset)
}
@@ -172,7 +179,7 @@ func (s *SingleChunkCacher) startCaching() {
if s.shouldCache {
s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
}
- s.completedTime = time.Now()
+ atomic.StoreInt64(&s.completedTimeNew, time.Now().Unix())
return
}
diff --git a/weed/filer/reader_pattern.go b/weed/filer/reader_pattern.go
index e32f7fc2d..b0906e99f 100644
--- a/weed/filer/reader_pattern.go
+++ b/weed/filer/reader_pattern.go
@@ -1,5 +1,9 @@
package filer
+import (
+ "sync/atomic"
+)
+
type ReaderPattern struct {
isSequentialCounter int64
lastReadStopOffset int64
@@ -18,18 +22,20 @@ func NewReaderPattern() *ReaderPattern {
}
func (rp *ReaderPattern) MonitorReadAt(offset int64, size int) {
- if rp.lastReadStopOffset == offset {
- if rp.isSequentialCounter < ModeChangeLimit {
- rp.isSequentialCounter++
+ lastOffset := atomic.SwapInt64(&rp.lastReadStopOffset, offset+int64(size))
+ counter := atomic.LoadInt64(&rp.isSequentialCounter)
+
+ if lastOffset == offset {
+ if counter < ModeChangeLimit {
+ atomic.AddInt64(&rp.isSequentialCounter, 1)
}
} else {
- if rp.isSequentialCounter > -ModeChangeLimit {
- rp.isSequentialCounter--
+ if counter > -ModeChangeLimit {
+ atomic.AddInt64(&rp.isSequentialCounter, -1)
}
}
- rp.lastReadStopOffset = offset + int64(size)
}
func (rp *ReaderPattern) IsRandomMode() bool {
- return rp.isSequentialCounter < 0
+ return atomic.LoadInt64(&rp.isSequentialCounter) < 0
}
diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go
index 8175c61f4..4595764ee 100644
--- a/weed/mount/filehandle.go
+++ b/weed/mount/filehandle.go
@@ -1,12 +1,14 @@
package mount
import (
+ "sync"
+
+ "golang.org/x/exp/slices"
+
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
- "golang.org/x/exp/slices"
- "sync"
)
type FileHandleId uint64
@@ -57,12 +59,20 @@ func (fh *FileHandle) GetEntry() *filer_pb.Entry {
defer fh.entryLock.Unlock()
return fh.entry
}
+
func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
fh.entry = entry
}
+func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry {
+ fh.entryLock.Lock()
+ defer fh.entryLock.Unlock()
+ fn(fh.entry)
+ return fh.entry
+}
+
func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
diff --git a/weed/mount/filehandle_map.go b/weed/mount/filehandle_map.go
index a5e1016ca..4cf674166 100644
--- a/weed/mount/filehandle_map.go
+++ b/weed/mount/filehandle_map.go
@@ -1,8 +1,9 @@
package mount
import (
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"sync"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
type FileHandleToInode struct {
@@ -49,7 +50,9 @@ func (i *FileHandleToInode) AcquireFileHandle(wfs *WFS, inode uint64, entry *fil
} else {
fh.counter++
}
- fh.entry = entry
+ if fh.entry != entry {
+ fh.SetEntry(entry)
+ }
return fh
}
diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go
index 9f1d85ab5..7cff71c52 100644
--- a/weed/mount/weedfs.go
+++ b/weed/mount/weedfs.go
@@ -135,10 +135,11 @@ func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle
}
var found bool
if fh, found = wfs.fhmap.FindFileHandle(inode); found {
- entry = fh.GetEntry()
- if entry != nil && fh.entry.Attributes == nil {
- entry.Attributes = &filer_pb.FuseAttributes{}
- }
+ entry = fh.UpdateEntry(func(entry *filer_pb.Entry) {
+ if entry != nil && fh.entry.Attributes == nil {
+ entry.Attributes = &filer_pb.FuseAttributes{}
+ }
+ })
} else {
entry, status = wfs.maybeLoadEntry(path)
}
diff --git a/weed/mount/weedfs_dir_lookup.go b/weed/mount/weedfs_dir_lookup.go
index 7a9b7fecc..49e4b1b56 100644
--- a/weed/mount/weedfs_dir_lookup.go
+++ b/weed/mount/weedfs_dir_lookup.go
@@ -2,7 +2,9 @@ package mount
import (
"context"
+
"github.com/hanwen/go-fuse/v2/fuse"
+
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
@@ -55,9 +57,13 @@ func (wfs *WFS) Lookup(cancel <-chan struct{}, header *fuse.InHeader, name strin
inode := wfs.inodeToPath.Lookup(fullFilePath, localEntry.Crtime.Unix(), localEntry.IsDirectory(), len(localEntry.HardLinkId) > 0, localEntry.Inode, true)
- if fh, found := wfs.fhmap.FindFileHandle(inode); found && fh.entry != nil {
- glog.V(4).Infof("lookup opened file %s size %d", dirPath.Child(localEntry.Name()), filer.FileSize(fh.entry))
- localEntry = filer.FromPbEntry(string(dirPath), fh.entry)
+ if fh, found := wfs.fhmap.FindFileHandle(inode); found {
+ fh.entryLock.Lock()
+ if fh.entry != nil {
+ glog.V(4).Infof("lookup opened file %s size %d", dirPath.Child(localEntry.Name()), filer.FileSize(fh.entry))
+ localEntry = filer.FromPbEntry(string(dirPath), fh.entry)
+ }
+ fh.entryLock.Unlock()
}
wfs.outputFilerEntry(out, inode, localEntry)