aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-03-09 23:08:38 -0800
committerChris Lu <chris.lu@gmail.com>2021-03-09 23:08:38 -0800
commit5ba4b479f8955cd6e8cb3235753d54948a8cacf2 (patch)
treee3072e432087f77f378c5ba312ebbc9d14c3b7bd
parentbf5b795717f4bca3887e0826d4f5551b1499d79d (diff)
downloadseaweedfs-5ba4b479f8955cd6e8cb3235753d54948a8cacf2.tar.xz
seaweedfs-5ba4b479f8955cd6e8cb3235753d54948a8cacf2.zip
properly lock file.entry object
fix https://github.com/chrislusf/seaweedfs/issues/1882
-rw-r--r--weed/filesys/dir_link.go25
-rw-r--r--weed/filesys/dirty_page.go9
-rw-r--r--weed/filesys/file.go27
-rw-r--r--weed/filesys/filehandle.go66
-rw-r--r--weed/filesys/fscache.go5
5 files changed, 88 insertions, 44 deletions
diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go
index 29391faeb..606e52fcb 100644
--- a/weed/filesys/dir_link.go
+++ b/weed/filesys/dir_link.go
@@ -35,15 +35,20 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f
return nil, err
}
+ oldEntry := oldFile.getEntry()
+ if oldEntry == nil {
+ return nil, fuse.EIO
+ }
+
// update old file to hardlink mode
- if len(oldFile.entry.HardLinkId) == 0 {
- oldFile.entry.HardLinkId = append(util.RandomBytes(16), HARD_LINK_MARKER)
- oldFile.entry.HardLinkCounter = 1
+ if len(oldEntry.HardLinkId) == 0 {
+ oldEntry.HardLinkId = append(util.RandomBytes(16), HARD_LINK_MARKER)
+ oldEntry.HardLinkCounter = 1
}
- oldFile.entry.HardLinkCounter++
+ oldEntry.HardLinkCounter++
updateOldEntryRequest := &filer_pb.UpdateEntryRequest{
Directory: oldFile.dir.FullPath(),
- Entry: oldFile.entry,
+ Entry: oldEntry,
Signatures: []int32{dir.wfs.signature},
}
@@ -53,11 +58,11 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f
Entry: &filer_pb.Entry{
Name: req.NewName,
IsDirectory: false,
- Attributes: oldFile.entry.Attributes,
- Chunks: oldFile.entry.Chunks,
- Extended: oldFile.entry.Extended,
- HardLinkId: oldFile.entry.HardLinkId,
- HardLinkCounter: oldFile.entry.HardLinkCounter,
+ Attributes: oldEntry.Attributes,
+ Chunks: oldEntry.Chunks,
+ Extended: oldEntry.Extended,
+ HardLinkId: oldEntry.HardLinkId,
+ HardLinkCounter: oldEntry.HardLinkCounter,
},
Signatures: []int32{dir.wfs.signature},
}
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index f05a3a56a..8888cff96 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -30,7 +30,7 @@ func newDirtyPages(file *File) *ContinuousDirtyPages {
func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) {
- glog.V(4).Infof("%s AddPage [%d,%d) of %d bytes", pages.f.fullpath(), offset, offset+int64(len(data)), pages.f.entry.Attributes.FileSize)
+ glog.V(4).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data)))
if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) {
// this is more than what buffer can hold.
@@ -69,7 +69,12 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD
return false
}
- fileSize := int64(pages.f.entry.Attributes.FileSize)
+ entry := pages.f.getEntry()
+ if entry == nil {
+ return false
+ }
+
+ fileSize := int64(entry.Attributes.FileSize)
chunkSize := min(maxList.Size(), fileSize-maxList.Offset())
if chunkSize == 0 {
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index bd722b31f..5931dd2ff 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -5,6 +5,7 @@ import (
"io"
"os"
"sort"
+ "sync"
"time"
"github.com/seaweedfs/fuse"
@@ -33,6 +34,7 @@ type File struct {
dir *Dir
wfs *WFS
entry *filer_pb.Entry
+ entryLock sync.RWMutex
entryViewCache []filer.VisibleInterval
isOpen int
reader io.ReaderAt
@@ -47,7 +49,7 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) (err error) {
glog.V(4).Infof("file Attr %s, open:%v existing:%v", file.fullpath(), file.isOpen, attr)
- entry := file.entry
+ entry := file.getEntry()
if file.isOpen <= 0 || entry == nil {
if entry, err = file.maybeLoadEntry(ctx); err != nil {
return err
@@ -258,7 +260,7 @@ func (file *File) Forget() {
}
func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer_pb.Entry, err error) {
- entry = file.entry
+ entry = file.getEntry()
if file.isOpen > 0 {
return entry, nil
}
@@ -299,8 +301,13 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
}
}
+ entry := file.getEntry()
+ if entry == nil {
+ return
+ }
+
// pick out-of-order chunks from existing chunks
- for _, chunk := range file.entry.Chunks {
+ for _, chunk := range entry.Chunks {
if lessThan(earliestChunk, chunk) {
chunks = append(chunks, chunk)
}
@@ -318,18 +325,22 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
file.reader = nil
- glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks))
+ glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(entry.Chunks), len(chunks))
- file.entry.Chunks = append(file.entry.Chunks, newChunks...)
+ entry.Chunks = append(entry.Chunks, newChunks...)
}
func (file *File) setEntry(entry *filer_pb.Entry) {
+ file.entryLock.Lock()
+ defer file.entryLock.Unlock()
file.entry = entry
file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), entry.Chunks)
file.reader = nil
}
func (file *File) clearEntry() {
+ file.entryLock.Lock()
+ defer file.entryLock.Unlock()
file.entry = nil
file.entryViewCache = nil
file.reader = nil
@@ -359,3 +370,9 @@ func (file *File) saveEntry(entry *filer_pb.Entry) error {
return nil
})
}
+
+func (file *File) getEntry() *filer_pb.Entry {
+ file.entryLock.RLock()
+ defer file.entryLock.RUnlock()
+ return file.entry
+}
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index fb073c9cd..adec1cd70 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -40,8 +40,9 @@ func newFileHandle(file *File, uid, gid uint32) *FileHandle {
Uid: uid,
Gid: gid,
}
- if fh.f.entry != nil {
- fh.f.entry.Attributes.FileSize = filer.FileSize(fh.f.entry)
+ entry := fh.f.getEntry()
+ if entry != nil {
+ entry.Attributes.FileSize = filer.FileSize(entry)
}
return fh
@@ -104,22 +105,27 @@ func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxSto
func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
- fileSize := int64(filer.FileSize(fh.f.entry))
+ entry := fh.f.getEntry()
+ if entry == nil {
+ return 0, io.EOF
+ }
+
+ fileSize := int64(filer.FileSize(entry))
if fileSize == 0 {
glog.V(1).Infof("empty fh %v", fh.f.fullpath())
return 0, io.EOF
}
- if offset+int64(len(buff)) <= int64(len(fh.f.entry.Content)) {
- totalRead := copy(buff, fh.f.entry.Content[offset:])
+ if offset+int64(len(buff)) <= int64(len(entry.Content)) {
+ totalRead := copy(buff, entry.Content[offset:])
glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead)
return int64(totalRead), nil
}
var chunkResolveErr error
if fh.f.entryViewCache == nil {
- fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), fh.f.entry.Chunks)
+ fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), entry.Chunks)
if chunkResolveErr != nil {
return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
}
@@ -158,8 +164,13 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
copy(data, req.Data)
}
- fh.f.entry.Content = nil
- fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(fh.f.entry.Attributes.FileSize)))
+ entry := fh.f.getEntry()
+ if entry == nil {
+ return fuse.EIO
+ }
+
+ entry.Content = nil
+ entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(entry.Attributes.FileSize)))
glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
fh.dirtyPages.AddPage(req.Offset, data)
@@ -242,35 +253,40 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
err := fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- if fh.f.entry.Attributes != nil {
- fh.f.entry.Attributes.Mime = fh.contentType
- if fh.f.entry.Attributes.Uid == 0 {
- fh.f.entry.Attributes.Uid = header.Uid
+ entry := fh.f.getEntry()
+ if entry == nil {
+ return nil
+ }
+
+ if entry.Attributes != nil {
+ entry.Attributes.Mime = fh.contentType
+ if entry.Attributes.Uid == 0 {
+ entry.Attributes.Uid = header.Uid
}
- if fh.f.entry.Attributes.Gid == 0 {
- fh.f.entry.Attributes.Gid = header.Gid
+ if entry.Attributes.Gid == 0 {
+ entry.Attributes.Gid = header.Gid
}
- if fh.f.entry.Attributes.Crtime == 0 {
- fh.f.entry.Attributes.Crtime = time.Now().Unix()
+ if entry.Attributes.Crtime == 0 {
+ entry.Attributes.Crtime = time.Now().Unix()
}
- fh.f.entry.Attributes.Mtime = time.Now().Unix()
- fh.f.entry.Attributes.FileMode = uint32(os.FileMode(fh.f.entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask)
- fh.f.entry.Attributes.Collection = fh.dirtyPages.collection
- fh.f.entry.Attributes.Replication = fh.dirtyPages.replication
+ entry.Attributes.Mtime = time.Now().Unix()
+ entry.Attributes.FileMode = uint32(os.FileMode(entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask)
+ entry.Attributes.Collection = fh.dirtyPages.collection
+ entry.Attributes.Replication = fh.dirtyPages.replication
}
request := &filer_pb.CreateEntryRequest{
Directory: fh.f.dir.FullPath(),
- Entry: fh.f.entry,
+ Entry: entry,
Signatures: []int32{fh.f.wfs.signature},
}
- glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(fh.f.entry.Chunks))
- for i, chunk := range fh.f.entry.Chunks {
+ glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(entry.Chunks))
+ for i, chunk := range entry.Chunks {
glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size))
}
- manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks)
+ manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.Chunks)
chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks)
chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks)
@@ -278,7 +294,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
// not good, but should be ok
glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
}
- fh.f.entry.Chunks = append(chunks, manifestChunks...)
+ entry.Chunks = append(chunks, manifestChunks...)
fh.f.wfs.mapPbIdFromLocalToFiler(request.Entry)
defer fh.f.wfs.mapPbIdFromFilerToLocal(request.Entry)
diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go
index fdec8253c..6b1012090 100644
--- a/weed/filesys/fscache.go
+++ b/weed/filesys/fscache.go
@@ -124,8 +124,9 @@ func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode {
}
if f, ok := src.node.(*File); ok {
f.Name = target.name
- if f.entry != nil {
- f.entry.Name = f.Name
+ entry := f.getEntry()
+ if entry != nil {
+ entry.Name = f.Name
}
}
parent.disconnectChild(target)