aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-09-11 19:44:34 -0700
committerchrislu <chris.lu@gmail.com>2022-09-11 19:44:34 -0700
commit22064c342585bddaa7ebdb21e39cac7db87826df (patch)
treede8af6599b1a2d3702b61829bb96329eef22e1c4
parentb9112747b57a45d5c1ef476897ec5940c76fc7e8 (diff)
downloadseaweedfs-22064c342585bddaa7ebdb21e39cac7db87826df.tar.xz
seaweedfs-22064c342585bddaa7ebdb21e39cac7db87826df.zip
mount: ensure ordered file handle lock and unlock
-rw-r--r--weed/mount/filehandle.go13
-rw-r--r--weed/mount/weedfs_file_copy_range.go9
-rw-r--r--weed/mount/weedfs_file_lseek.go5
-rw-r--r--weed/mount/weedfs_file_read.go5
-rw-r--r--weed/mount/weedfs_file_sync.go8
-rw-r--r--weed/mount/weedfs_file_write.go5
6 files changed, 26 insertions, 19 deletions
diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go
index 5d1552ce6..aadcb3836 100644
--- a/weed/mount/filehandle.go
+++ b/weed/mount/filehandle.go
@@ -1,6 +1,8 @@
package mount
import (
+ "golang.org/x/sync/semaphore"
+ "math"
"sync"
"golang.org/x/exp/slices"
@@ -28,17 +30,18 @@ type FileHandle struct {
reader *filer.ChunkReadAt
contentType string
handle uint64
- sync.Mutex
+ orderedMutex *semaphore.Weighted
isDeleted bool
}
func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_pb.Entry) *FileHandle {
fh := &FileHandle{
- fh: handleId,
- counter: 1,
- inode: inode,
- wfs: wfs,
+ fh: handleId,
+ counter: 1,
+ inode: inode,
+ wfs: wfs,
+ orderedMutex: semaphore.NewWeighted(int64(math.MaxInt64)),
}
// dirtyPages: newContinuousDirtyPages(file, writeOnly),
fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
diff --git a/weed/mount/weedfs_file_copy_range.go b/weed/mount/weedfs_file_copy_range.go
index 412869abc..bc092a252 100644
--- a/weed/mount/weedfs_file_copy_range.go
+++ b/weed/mount/weedfs_file_copy_range.go
@@ -1,6 +1,7 @@
package mount
import (
+ "context"
"net/http"
"github.com/hanwen/go-fuse/v2/fuse"
@@ -43,8 +44,8 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
}
// lock source and target file handles
- fhOut.Lock()
- defer fhOut.Unlock()
+ fhOut.orderedMutex.Acquire(context.Background(), 1)
+ defer fhOut.orderedMutex.Release(1)
fhOut.entryLock.Lock()
defer fhOut.entryLock.Unlock()
@@ -53,8 +54,8 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
}
if fhIn.fh != fhOut.fh {
- fhIn.Lock()
- defer fhIn.Unlock()
+ fhIn.orderedMutex.Acquire(context.Background(), 1)
+ defer fhIn.orderedMutex.Release(1)
fhIn.entryLock.Lock()
defer fhIn.entryLock.Unlock()
}
diff --git a/weed/mount/weedfs_file_lseek.go b/weed/mount/weedfs_file_lseek.go
index ed495f5b5..0564ac0ee 100644
--- a/weed/mount/weedfs_file_lseek.go
+++ b/weed/mount/weedfs_file_lseek.go
@@ -1,6 +1,7 @@
package mount
import (
+ "context"
"syscall"
"github.com/hanwen/go-fuse/v2/fuse"
@@ -35,8 +36,8 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO
}
// lock the file until the proper offset was calculated
- fh.Lock()
- defer fh.Unlock()
+ fh.orderedMutex.Acquire(context.Background(), 1)
+ defer fh.orderedMutex.Release(1)
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
diff --git a/weed/mount/weedfs_file_read.go b/weed/mount/weedfs_file_read.go
index 307ad5960..8375f9a5d 100644
--- a/weed/mount/weedfs_file_read.go
+++ b/weed/mount/weedfs_file_read.go
@@ -1,6 +1,7 @@
package mount
import (
+ "context"
"io"
"github.com/hanwen/go-fuse/v2/fuse"
@@ -39,8 +40,8 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse
return nil, fuse.ENOENT
}
- fh.Lock()
- defer fh.Unlock()
+ fh.orderedMutex.Acquire(context.Background(), 1)
+ defer fh.orderedMutex.Release(1)
offset := int64(in.Offset)
totalRead, err := readDataByFileHandle(buff, fh, offset)
diff --git a/weed/mount/weedfs_file_sync.go b/weed/mount/weedfs_file_sync.go
index 4e1a9f3f0..7b80ddc73 100644
--- a/weed/mount/weedfs_file_sync.go
+++ b/weed/mount/weedfs_file_sync.go
@@ -55,8 +55,8 @@ func (wfs *WFS) Flush(cancel <-chan struct{}, in *fuse.FlushIn) fuse.Status {
return fuse.ENOENT
}
- fh.Lock()
- defer fh.Unlock()
+ fh.orderedMutex.Acquire(context.Background(), 1)
+ defer fh.orderedMutex.Release(1)
return wfs.doFlush(fh, in.Uid, in.Gid)
}
@@ -87,8 +87,8 @@ func (wfs *WFS) Fsync(cancel <-chan struct{}, in *fuse.FsyncIn) (code fuse.Statu
return fuse.ENOENT
}
- fh.Lock()
- defer fh.Unlock()
+ fh.orderedMutex.Acquire(context.Background(), 1)
+ defer fh.orderedMutex.Release(1)
return wfs.doFlush(fh, in.Uid, in.Gid)
diff --git a/weed/mount/weedfs_file_write.go b/weed/mount/weedfs_file_write.go
index 2b7a6cea2..255d4adc9 100644
--- a/weed/mount/weedfs_file_write.go
+++ b/weed/mount/weedfs_file_write.go
@@ -1,6 +1,7 @@
package mount
import (
+ "context"
"github.com/hanwen/go-fuse/v2/fuse"
"net/http"
"syscall"
@@ -45,8 +46,8 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
fh.dirtyPages.writerPattern.MonitorWriteAt(int64(in.Offset), int(in.Size))
- fh.Lock()
- defer fh.Unlock()
+ fh.orderedMutex.Acquire(context.Background(), 1)
+ defer fh.orderedMutex.Release(1)
entry := fh.entry
if entry == nil {