aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mount/filehandle.go6
-rw-r--r--weed/mount/weedfs.go2
-rw-r--r--weed/mount/weedfs_file_copy_range.go9
-rw-r--r--weed/mount/weedfs_file_lseek.go5
-rw-r--r--weed/mount/weedfs_file_read.go5
-rw-r--r--weed/mount/weedfs_file_sync.go6
-rw-r--r--weed/mount/weedfs_file_write.go5
-rw-r--r--weed/util/lock_table.go147
-rw-r--r--weed/util/lock_table_test.go41
9 files changed, 211 insertions, 15 deletions
diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go
index 6513d96ba..2e08432c0 100644
--- a/weed/mount/filehandle.go
+++ b/weed/mount/filehandle.go
@@ -27,7 +27,6 @@ type FileHandle struct {
dirtyPages *PageWriter
reader *filer.ChunkReadAt
contentType string
- sync.RWMutex
isDeleted bool
@@ -102,8 +101,9 @@ func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
}
func (fh *FileHandle) ReleaseHandle() {
- fh.Lock()
- defer fh.Unlock()
+
+ fhActiveLock := fh.wfs.fhLockTable.AcquireLock("ReleaseHandle", fh.fh, util.ExclusiveLock)
+ defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go
index 4ac01b3e6..de7502688 100644
--- a/weed/mount/weedfs.go
+++ b/weed/mount/weedfs.go
@@ -78,6 +78,7 @@ type WFS struct {
dhmap *DirectoryHandleToInode
fuseServer *fuse.Server
IsOverQuota bool
+ fhLockTable *util.LockTable[FileHandleId]
}
func NewSeaweedFileSystem(option *Option) *WFS {
@@ -88,6 +89,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath)),
fhmap: NewFileHandleToInode(),
dhmap: NewDirectoryHandleToInode(),
+ fhLockTable: util.NewLockTable[FileHandleId](),
}
wfs.option.filerIndex = int32(rand.Intn(len(option.FilerAddresses)))
diff --git a/weed/mount/weedfs_file_copy_range.go b/weed/mount/weedfs_file_copy_range.go
index 49bab17f3..50a2d5a95 100644
--- a/weed/mount/weedfs_file_copy_range.go
+++ b/weed/mount/weedfs_file_copy_range.go
@@ -1,6 +1,7 @@
package mount
import (
+ "github.com/seaweedfs/seaweedfs/weed/util"
"net/http"
"time"
@@ -44,16 +45,16 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
}
// lock source and target file handles
- fhOut.Lock()
- defer fhOut.Unlock()
+ fhOutActiveLock := fhOut.wfs.fhLockTable.AcquireLock("CopyFileRange", fhOut.fh, util.ExclusiveLock)
+ defer fhOut.wfs.fhLockTable.ReleaseLock(fhOut.fh, fhOutActiveLock)
if fhOut.entry == nil {
return 0, fuse.ENOENT
}
if fhIn.fh != fhOut.fh {
- fhIn.RLock()
- defer fhIn.RUnlock()
+ fhInActiveLock := fhIn.wfs.fhLockTable.AcquireLock("CopyFileRange", fhIn.fh, util.ExclusiveLock)
+ defer fhIn.wfs.fhLockTable.ReleaseLock(fhIn.fh, fhInActiveLock)
}
// directories are not supported
diff --git a/weed/mount/weedfs_file_lseek.go b/weed/mount/weedfs_file_lseek.go
index 9dfc4d4f1..35157d993 100644
--- a/weed/mount/weedfs_file_lseek.go
+++ b/weed/mount/weedfs_file_lseek.go
@@ -1,6 +1,7 @@
package mount
import (
+ "github.com/seaweedfs/seaweedfs/weed/util"
"syscall"
"github.com/hanwen/go-fuse/v2/fuse"
@@ -35,8 +36,8 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO
}
// lock the file until the proper offset was calculated
- fh.RLock()
- defer fh.RUnlock()
+ fhActiveLock := fh.wfs.fhLockTable.AcquireLock("Lseek", fh.fh, util.SharedLock)
+ defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
fh.entryLock.RLock()
defer fh.entryLock.RUnlock()
diff --git a/weed/mount/weedfs_file_read.go b/weed/mount/weedfs_file_read.go
index 11ff07641..bf9c89071 100644
--- a/weed/mount/weedfs_file_read.go
+++ b/weed/mount/weedfs_file_read.go
@@ -3,6 +3,7 @@ package mount
import (
"bytes"
"fmt"
+ "github.com/seaweedfs/seaweedfs/weed/util"
"io"
"github.com/hanwen/go-fuse/v2/fuse"
@@ -41,8 +42,8 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse
return nil, fuse.ENOENT
}
- fh.RLock()
- defer fh.RUnlock()
+ fhActiveLock := fh.wfs.fhLockTable.AcquireLock("Read", fh.fh, util.SharedLock)
+ defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
offset := int64(in.Offset)
totalRead, err := readDataByFileHandle(buff, fh, offset)
diff --git a/weed/mount/weedfs_file_sync.go b/weed/mount/weedfs_file_sync.go
index 4254e3830..74e16d43f 100644
--- a/weed/mount/weedfs_file_sync.go
+++ b/weed/mount/weedfs_file_sync.go
@@ -7,6 +7,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
"syscall"
"time"
)
@@ -89,8 +90,6 @@ func (wfs *WFS) Fsync(cancel <-chan struct{}, in *fuse.FsyncIn) (code fuse.Statu
}
func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
- fh.Lock()
- defer fh.Unlock()
// flush works at fh level
fileFullPath := fh.FullPath()
@@ -105,6 +104,9 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
}
}
+ fhActiveLock := fh.wfs.fhLockTable.AcquireLock("doFlush", fh.fh, util.ExclusiveLock)
+ defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
+
if !fh.dirtyMetadata {
return fuse.OK
}
diff --git a/weed/mount/weedfs_file_write.go b/weed/mount/weedfs_file_write.go
index 5a9a21ded..1ec20c294 100644
--- a/weed/mount/weedfs_file_write.go
+++ b/weed/mount/weedfs_file_write.go
@@ -2,6 +2,7 @@ package mount
import (
"github.com/hanwen/go-fuse/v2/fuse"
+ "github.com/seaweedfs/seaweedfs/weed/util"
"net/http"
"syscall"
"time"
@@ -48,8 +49,8 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
tsNs := time.Now().UnixNano()
- fh.Lock()
- defer fh.Unlock()
+ fhActiveLock := fh.wfs.fhLockTable.AcquireLock("Write", fh.fh, util.ExclusiveLock)
+ defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
entry := fh.GetEntry()
if entry == nil {
diff --git a/weed/util/lock_table.go b/weed/util/lock_table.go
new file mode 100644
index 000000000..015e424f9
--- /dev/null
+++ b/weed/util/lock_table.go
@@ -0,0 +1,147 @@
+package util
+
+import (
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "sync"
+ "sync/atomic"
+)
+
+// LockTable is a table of locks that can be acquired.
+// Locks are acquired in order of request.
+type LockTable[T comparable] struct {
+ mu sync.Mutex
+ locks map[T]*LockEntry
+ lockIdSeq int64
+}
+
+type LockEntry struct {
+ mu sync.Mutex
+ waiters []*ActiveLock // ordered waiters that are blocked by exclusive locks
+ activeLockOwnerCount int32
+ lockType LockType
+ cond *sync.Cond
+}
+
+type LockType int
+
+const (
+ SharedLock LockType = iota
+ ExclusiveLock
+)
+
+type ActiveLock struct {
+ ID int64
+ isDeleted bool
+ intention string // for debugging
+}
+
+func NewLockTable[T comparable]() *LockTable[T] {
+ return &LockTable[T]{
+ locks: make(map[T]*LockEntry),
+ }
+}
+
+func (lt *LockTable[T]) NewActiveLock(intention string) *ActiveLock {
+ id := atomic.AddInt64(&lt.lockIdSeq, 1)
+ l := &ActiveLock{ID: id, intention: intention}
+ return l
+}
+
+func (lt *LockTable[T]) AcquireLock(intention string, key T, lockType LockType) (lock *ActiveLock) {
+ lt.mu.Lock()
+ // Get or create the lock entry for the key
+ entry, exists := lt.locks[key]
+ if !exists {
+ entry = &LockEntry{}
+ entry.cond = sync.NewCond(&entry.mu)
+ lt.locks[key] = entry
+ }
+ lt.mu.Unlock()
+
+ lock = lt.NewActiveLock(intention)
+
+ // If the lock is held exclusively, wait
+ entry.mu.Lock()
+ if len(entry.waiters) > 0 || lockType == ExclusiveLock {
+ glog.V(4).Infof("ActiveLock %d %s wait for %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeLockOwnerCount)
+ if len(entry.waiters) > 0 {
+ for _, waiter := range entry.waiters {
+ fmt.Printf(" %d", waiter.ID)
+ }
+ fmt.Printf("\n")
+ }
+ entry.waiters = append(entry.waiters, lock)
+ if lockType == ExclusiveLock {
+ for !lock.isDeleted && ((len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) || entry.activeLockOwnerCount > 0) {
+ entry.cond.Wait()
+ }
+ } else {
+ for !lock.isDeleted && (len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) {
+ entry.cond.Wait()
+ }
+ }
+ // Remove the transaction from the waiters list
+ if len(entry.waiters) > 0 && lock.ID == entry.waiters[0].ID {
+ entry.waiters = entry.waiters[1:]
+ entry.cond.Broadcast()
+ }
+ }
+ entry.activeLockOwnerCount++
+
+ // Otherwise, grant the lock
+ entry.lockType = lockType
+ glog.V(4).Infof("ActiveLock %d %s locked %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeLockOwnerCount)
+ if len(entry.waiters) > 0 {
+ for _, waiter := range entry.waiters {
+ fmt.Printf(" %d", waiter.ID)
+ }
+ fmt.Printf("\n")
+ }
+ entry.mu.Unlock()
+
+ return lock
+}
+
+func (lt *LockTable[T]) ReleaseLock(key T, lock *ActiveLock) {
+ lt.mu.Lock()
+ defer lt.mu.Unlock()
+
+ entry, exists := lt.locks[key]
+ if !exists {
+ return
+ }
+
+ entry.mu.Lock()
+ defer entry.mu.Unlock()
+
+ // Remove the transaction from the waiters list
+ for i, waiter := range entry.waiters {
+ if waiter == lock {
+ waiter.isDeleted = true
+ entry.waiters = append(entry.waiters[:i], entry.waiters[i+1:]...)
+ break
+ }
+ }
+
+ // If there are no waiters, release the lock
+ if len(entry.waiters) == 0 {
+ delete(lt.locks, key)
+ }
+
+ glog.V(4).Infof("ActiveLock %d %s unlocked %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, entry.lockType, len(entry.waiters), entry.activeLockOwnerCount)
+ if len(entry.waiters) > 0 {
+ for _, waiter := range entry.waiters {
+ fmt.Printf(" %d", waiter.ID)
+ }
+ fmt.Printf("\n")
+ }
+ entry.activeLockOwnerCount--
+
+ // Notify the next waiter
+ entry.cond.Broadcast()
+}
+
+func main() {
+
+}
diff --git a/weed/util/lock_table_test.go b/weed/util/lock_table_test.go
new file mode 100644
index 000000000..272e5672f
--- /dev/null
+++ b/weed/util/lock_table_test.go
@@ -0,0 +1,41 @@
+package util
+
+import (
+ "math/rand"
+ "sync"
+ "testing"
+ "time"
+)
+
+func TestOrderedLock(t *testing.T) {
+ lt := NewLockTable[string]()
+
+ var wg sync.WaitGroup
+ // Simulate transactions requesting locks
+ for i := 1; i <= 50; i++ {
+ wg.Add(1)
+ go func(i int) {
+ defer wg.Done()
+ key := "resource"
+ lockType := SharedLock
+ if i%5 == 0 {
+ lockType = ExclusiveLock
+ }
+
+ // Simulate attempting to acquire the lock
+ lock := lt.AcquireLock("", key, lockType)
+
+ // Lock acquired, perform some work
+ glog.V(4).Infof("ActiveLock %d acquired the lock.\n", lock.ID)
+
+ // Simulate some work
+ time.Sleep(time.Duration(rand.Int31n(10)*10) * time.Millisecond)
+
+ // Release the lock
+ lt.ReleaseLock(key, lock)
+ glog.V(4).Infof("ActiveLock %d released the lock.\n", lock.ID)
+ }(i)
+ }
+
+ wg.Wait()
+}