aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-12-04 23:33:05 -0800
committerchrislu <chris.lu@gmail.com>2022-12-04 23:33:05 -0800
commit94bc9afd9d3f8e049219c1cdc9f0d6e0eb4cf456 (patch)
tree2dec30792acbe4600279d9df607d4d61677222fe
parent2b783738d6d3b4b2810b4a3b3516d59a1b9edc46 (diff)
downloadseaweedfs-94bc9afd9d3f8e049219c1cdc9f0d6e0eb4cf456.tar.xz
seaweedfs-94bc9afd9d3f8e049219c1cdc9f0d6e0eb4cf456.zip
refactor: moved to locked entry
-rw-r--r--weed/mount/filehandle.go40
-rw-r--r--weed/mount/filehandle_map.go2
-rw-r--r--weed/mount/filehandle_read.go2
-rw-r--r--weed/mount/locked_entry.go42
-rw-r--r--weed/mount/weedfs_attr.go6
-rw-r--r--weed/mount/weedfs_dir_lookup.go8
-rw-r--r--weed/mount/weedfs_dir_read.go2
-rw-r--r--weed/mount/weedfs_file_copy_range.go4
-rw-r--r--weed/mount/weedfs_file_lseek.go4
-rw-r--r--weed/mount/weedfs_file_sync.go5
-rw-r--r--weed/mount/weedfs_file_write.go2
-rw-r--r--weed/mount/weedfs_rename.go7
-rw-r--r--weed/mount/weedfs_xattr.go12
13 files changed, 74 insertions, 62 deletions
diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go
index c2a197da7..7281ede66 100644
--- a/weed/mount/filehandle.go
+++ b/weed/mount/filehandle.go
@@ -1,27 +1,23 @@
package mount
import (
- "golang.org/x/sync/semaphore"
- "math"
- "sync"
-
- "golang.org/x/exp/slices"
-
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
+ "golang.org/x/exp/slices"
+ "golang.org/x/sync/semaphore"
+ "math"
)
type FileHandleId uint64
type FileHandle struct {
- fh FileHandleId
- counter int64
- entry *filer_pb.Entry
- entryLock sync.Mutex
- inode uint64
- wfs *WFS
+ fh FileHandleId
+ counter int64
+ entry *LockedEntry
+ inode uint64
+ wfs *WFS
// cache file has been written to
dirtyMetadata bool
@@ -48,6 +44,9 @@ func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_p
if entry != nil {
entry.Attributes.FileSize = filer.FileSize(entry)
}
+ fh.entry = &LockedEntry{
+ Entry: entry,
+ }
return fh
}
@@ -58,27 +57,18 @@ func (fh *FileHandle) FullPath() util.FullPath {
}
func (fh *FileHandle) GetEntry() *filer_pb.Entry {
- fh.entryLock.Lock()
- defer fh.entryLock.Unlock()
- return fh.entry
+ return fh.entry.GetEntry()
}
func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
- fh.entryLock.Lock()
- defer fh.entryLock.Unlock()
- fh.entry = entry
+ fh.entry.SetEntry(entry)
}
func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry {
- fh.entryLock.Lock()
- defer fh.entryLock.Unlock()
- fn(fh.entry)
- return fh.entry
+ return fh.entry.UpdateEntry(fn)
}
func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
- fh.entryLock.Lock()
- defer fh.entryLock.Unlock()
if fh.entry == nil {
return
@@ -107,7 +97,7 @@ func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
glog.V(4).Infof("%s existing %d chunks adds %d more", fh.FullPath(), len(fh.entry.GetChunks()), len(chunks))
- fh.entry.Chunks = append(fh.entry.GetChunks(), newChunks...)
+ fh.entry.AppendChunks(newChunks)
fh.entryViewCache = nil
}
diff --git a/weed/mount/filehandle_map.go b/weed/mount/filehandle_map.go
index 4cf674166..cc5885ffc 100644
--- a/weed/mount/filehandle_map.go
+++ b/weed/mount/filehandle_map.go
@@ -50,7 +50,7 @@ func (i *FileHandleToInode) AcquireFileHandle(wfs *WFS, inode uint64, entry *fil
} else {
fh.counter++
}
- if fh.entry != entry {
+ if fh.GetEntry() != entry {
fh.SetEntry(entry)
}
return fh
diff --git a/weed/mount/filehandle_read.go b/weed/mount/filehandle_read.go
index 08f678e69..a316a16cd 100644
--- a/weed/mount/filehandle_read.go
+++ b/weed/mount/filehandle_read.go
@@ -26,7 +26,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
fileFullPath := fh.FullPath()
- entry := fh.entry
+ entry := fh.GetEntry()
if entry == nil {
return 0, io.EOF
}
diff --git a/weed/mount/locked_entry.go b/weed/mount/locked_entry.go
new file mode 100644
index 000000000..f3b4bf484
--- /dev/null
+++ b/weed/mount/locked_entry.go
@@ -0,0 +1,42 @@
+package mount
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "sync"
+)
+
+type LockedEntry struct {
+ *filer_pb.Entry
+ sync.RWMutex
+}
+
+func (le *LockedEntry) GetEntry() *filer_pb.Entry {
+ le.RLock()
+ defer le.RUnlock()
+ return le.Entry
+}
+
+func (le *LockedEntry) SetEntry(entry *filer_pb.Entry) {
+ le.Lock()
+ defer le.Unlock()
+ le.Entry = entry
+}
+
+func (le *LockedEntry) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry {
+ le.Lock()
+ defer le.Unlock()
+ fn(le.Entry)
+ return le.Entry
+}
+
+func (le *LockedEntry) GetChunks() []*filer_pb.FileChunk {
+ le.RLock()
+ defer le.RUnlock()
+ return le.Entry.Chunks
+}
+
+func (le *LockedEntry) AppendChunks(newChunks []*filer_pb.FileChunk) {
+ le.Lock()
+ defer le.Unlock()
+ le.Entry.Chunks = append(le.Entry.Chunks, newChunks...)
+}
diff --git a/weed/mount/weedfs_attr.go b/weed/mount/weedfs_attr.go
index 7691d4e59..3550066e0 100644
--- a/weed/mount/weedfs_attr.go
+++ b/weed/mount/weedfs_attr.go
@@ -25,7 +25,7 @@ func (wfs *WFS) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse
} else {
if fh, found := wfs.fhmap.FindFileHandle(inode); found {
out.AttrValid = 1
- wfs.setAttrByPbEntry(&out.Attr, inode, fh.entry)
+ wfs.setAttrByPbEntry(&out.Attr, inode, fh.entry.GetEntry())
out.Nlink = 0
return fuse.OK
}
@@ -44,10 +44,6 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse
if status != fuse.OK {
return status
}
- if fh != nil {
- fh.entryLock.Lock()
- defer fh.entryLock.Unlock()
- }
if size, ok := input.GetSize(); ok && entry != nil {
glog.V(4).Infof("%v setattr set size=%v chunks=%d", path, size, len(entry.GetChunks()))
diff --git a/weed/mount/weedfs_dir_lookup.go b/weed/mount/weedfs_dir_lookup.go
index 49e4b1b56..015f04e99 100644
--- a/weed/mount/weedfs_dir_lookup.go
+++ b/weed/mount/weedfs_dir_lookup.go
@@ -58,12 +58,10 @@ func (wfs *WFS) Lookup(cancel <-chan struct{}, header *fuse.InHeader, name strin
inode := wfs.inodeToPath.Lookup(fullFilePath, localEntry.Crtime.Unix(), localEntry.IsDirectory(), len(localEntry.HardLinkId) > 0, localEntry.Inode, true)
if fh, found := wfs.fhmap.FindFileHandle(inode); found {
- fh.entryLock.Lock()
- if fh.entry != nil {
- glog.V(4).Infof("lookup opened file %s size %d", dirPath.Child(localEntry.Name()), filer.FileSize(fh.entry))
- localEntry = filer.FromPbEntry(string(dirPath), fh.entry)
+ if entry := fh.GetEntry(); entry != nil {
+ glog.V(4).Infof("lookup opened file %s size %d", dirPath.Child(localEntry.Name()), filer.FileSize(entry))
+ localEntry = filer.FromPbEntry(string(dirPath), entry)
}
- fh.entryLock.Unlock()
}
wfs.outputFilerEntry(out, inode, localEntry)
diff --git a/weed/mount/weedfs_dir_read.go b/weed/mount/weedfs_dir_read.go
index ea64feeb7..f140fd86f 100644
--- a/weed/mount/weedfs_dir_read.go
+++ b/weed/mount/weedfs_dir_read.go
@@ -173,7 +173,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
}
if fh, found := wfs.fhmap.FindFileHandle(inode); found {
glog.V(4).Infof("readdir opened file %s", dirPath.Child(dirEntry.Name))
- entry = filer.FromPbEntry(string(dirPath), fh.entry)
+ entry = filer.FromPbEntry(string(dirPath), fh.GetEntry())
}
wfs.outputFilerEntry(entryOut, inode, entry)
}
diff --git a/weed/mount/weedfs_file_copy_range.go b/weed/mount/weedfs_file_copy_range.go
index bc092a252..4b0d22137 100644
--- a/weed/mount/weedfs_file_copy_range.go
+++ b/weed/mount/weedfs_file_copy_range.go
@@ -46,8 +46,6 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
// lock source and target file handles
fhOut.orderedMutex.Acquire(context.Background(), 1)
defer fhOut.orderedMutex.Release(1)
- fhOut.entryLock.Lock()
- defer fhOut.entryLock.Unlock()
if fhOut.entry == nil {
return 0, fuse.ENOENT
@@ -56,8 +54,6 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
if fhIn.fh != fhOut.fh {
fhIn.orderedMutex.Acquire(context.Background(), 1)
defer fhIn.orderedMutex.Release(1)
- fhIn.entryLock.Lock()
- defer fhIn.entryLock.Unlock()
}
// directories are not supported
diff --git a/weed/mount/weedfs_file_lseek.go b/weed/mount/weedfs_file_lseek.go
index 43970983b..69edf7144 100644
--- a/weed/mount/weedfs_file_lseek.go
+++ b/weed/mount/weedfs_file_lseek.go
@@ -38,10 +38,8 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO
// lock the file until the proper offset was calculated
fh.orderedMutex.Acquire(context.Background(), 1)
defer fh.orderedMutex.Release(1)
- fh.entryLock.Lock()
- defer fh.entryLock.Unlock()
- fileSize := int64(filer.FileSize(fh.entry))
+ fileSize := int64(filer.FileSize(fh.GetEntry()))
offset := max(int64(in.Offset), 0)
glog.V(4).Infof(
diff --git a/weed/mount/weedfs_file_sync.go b/weed/mount/weedfs_file_sync.go
index 585ca0b47..244963ad3 100644
--- a/weed/mount/weedfs_file_sync.go
+++ b/weed/mount/weedfs_file_sync.go
@@ -118,10 +118,7 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- fh.entryLock.Lock()
- defer fh.entryLock.Unlock()
-
- entry := fh.entry
+ entry := fh.GetEntry()
if entry == nil {
return nil
}
diff --git a/weed/mount/weedfs_file_write.go b/weed/mount/weedfs_file_write.go
index 255d4adc9..7b13d54ff 100644
--- a/weed/mount/weedfs_file_write.go
+++ b/weed/mount/weedfs_file_write.go
@@ -49,7 +49,7 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
fh.orderedMutex.Acquire(context.Background(), 1)
defer fh.orderedMutex.Release(1)
- entry := fh.entry
+ entry := fh.GetEntry()
if entry == nil {
return 0, fuse.OK
}
diff --git a/weed/mount/weedfs_rename.go b/weed/mount/weedfs_rename.go
index 42dcf348d..58c60aa28 100644
--- a/weed/mount/weedfs_rename.go
+++ b/weed/mount/weedfs_rename.go
@@ -235,8 +235,11 @@ func (wfs *WFS) handleRenameResponse(ctx context.Context, resp *filer_pb.StreamR
sourceInode, targetInode := wfs.inodeToPath.MovePath(oldPath, newPath)
if sourceInode != 0 {
- if fh, foundFh := wfs.fhmap.FindFileHandle(sourceInode); foundFh && fh.entry != nil {
- fh.entry.Name = newName
+ fh, foundFh := wfs.fhmap.FindFileHandle(sourceInode)
+ if foundFh {
+ if entry := fh.GetEntry(); entry != nil {
+ entry.Name = newName
+ }
}
// invalidate attr and data
// wfs.fuseServer.InodeNotify(sourceInode, 0, -1)
diff --git a/weed/mount/weedfs_xattr.go b/weed/mount/weedfs_xattr.go
index b03fa01f1..7a5109ee8 100644
--- a/weed/mount/weedfs_xattr.go
+++ b/weed/mount/weedfs_xattr.go
@@ -103,17 +103,13 @@ func (wfs *WFS) SetXAttr(cancel <-chan struct{}, input *fuse.SetXAttrIn, attr st
}
}
- path, fh, entry, status := wfs.maybeReadEntry(input.NodeId)
+ path, _, entry, status := wfs.maybeReadEntry(input.NodeId)
if status != fuse.OK {
return status
}
if entry == nil {
return fuse.ENOENT
}
- if fh != nil {
- fh.entryLock.Lock()
- defer fh.entryLock.Unlock()
- }
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
@@ -181,17 +177,13 @@ func (wfs *WFS) RemoveXAttr(cancel <-chan struct{}, header *fuse.InHeader, attr
if len(attr) == 0 {
return fuse.EINVAL
}
- path, fh, entry, status := wfs.maybeReadEntry(header.NodeId)
+ path, _, entry, status := wfs.maybeReadEntry(header.NodeId)
if status != fuse.OK {
return status
}
if entry == nil {
return fuse.OK
}
- if fh != nil {
- fh.entryLock.Lock()
- defer fh.entryLock.Unlock()
- }
if entry.Extended == nil {
return fuse.ENOATTR