diff options
| -rw-r--r-- | weed/mount/filehandle.go | 6 | ||||
| -rw-r--r-- | weed/mount/weedfs.go | 2 | ||||
| -rw-r--r-- | weed/mount/weedfs_file_copy_range.go | 9 | ||||
| -rw-r--r-- | weed/mount/weedfs_file_lseek.go | 5 | ||||
| -rw-r--r-- | weed/mount/weedfs_file_read.go | 5 | ||||
| -rw-r--r-- | weed/mount/weedfs_file_sync.go | 6 | ||||
| -rw-r--r-- | weed/mount/weedfs_file_write.go | 5 | ||||
| -rw-r--r-- | weed/util/lock_table.go | 147 | ||||
| -rw-r--r-- | weed/util/lock_table_test.go | 41 |
9 files changed, 211 insertions, 15 deletions
diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go index 6513d96ba..2e08432c0 100644 --- a/weed/mount/filehandle.go +++ b/weed/mount/filehandle.go @@ -27,7 +27,6 @@ type FileHandle struct { dirtyPages *PageWriter reader *filer.ChunkReadAt contentType string - sync.RWMutex isDeleted bool @@ -102,8 +101,9 @@ func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) { } func (fh *FileHandle) ReleaseHandle() { - fh.Lock() - defer fh.Unlock() + + fhActiveLock := fh.wfs.fhLockTable.AcquireLock("ReleaseHandle", fh.fh, util.ExclusiveLock) + defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock) fh.entryLock.Lock() defer fh.entryLock.Unlock() diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 4ac01b3e6..de7502688 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -78,6 +78,7 @@ type WFS struct { dhmap *DirectoryHandleToInode fuseServer *fuse.Server IsOverQuota bool + fhLockTable *util.LockTable[FileHandleId] } func NewSeaweedFileSystem(option *Option) *WFS { @@ -88,6 +89,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath)), fhmap: NewFileHandleToInode(), dhmap: NewDirectoryHandleToInode(), + fhLockTable: util.NewLockTable[FileHandleId](), } wfs.option.filerIndex = int32(rand.Intn(len(option.FilerAddresses))) diff --git a/weed/mount/weedfs_file_copy_range.go b/weed/mount/weedfs_file_copy_range.go index 49bab17f3..50a2d5a95 100644 --- a/weed/mount/weedfs_file_copy_range.go +++ b/weed/mount/weedfs_file_copy_range.go @@ -1,6 +1,7 @@ package mount import ( + "github.com/seaweedfs/seaweedfs/weed/util" "net/http" "time" @@ -44,16 +45,16 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn) } // lock source and target file handles - fhOut.Lock() - defer fhOut.Unlock() + fhOutActiveLock := fhOut.wfs.fhLockTable.AcquireLock("CopyFileRange", fhOut.fh, util.ExclusiveLock) + defer fhOut.wfs.fhLockTable.ReleaseLock(fhOut.fh, fhOutActiveLock) if fhOut.entry == nil { return 0, fuse.ENOENT } if fhIn.fh != fhOut.fh { - fhIn.RLock() - defer fhIn.RUnlock() + fhInActiveLock := fhIn.wfs.fhLockTable.AcquireLock("CopyFileRange", fhIn.fh, util.ExclusiveLock) + defer fhIn.wfs.fhLockTable.ReleaseLock(fhIn.fh, fhInActiveLock) } // directories are not supported diff --git a/weed/mount/weedfs_file_lseek.go b/weed/mount/weedfs_file_lseek.go index 9dfc4d4f1..35157d993 100644 --- a/weed/mount/weedfs_file_lseek.go +++ b/weed/mount/weedfs_file_lseek.go @@ -1,6 +1,7 @@ package mount import ( + "github.com/seaweedfs/seaweedfs/weed/util" "syscall" "github.com/hanwen/go-fuse/v2/fuse" @@ -35,8 +36,8 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO } // lock the file until the proper offset was calculated - fh.RLock() - defer fh.RUnlock() + fhActiveLock := fh.wfs.fhLockTable.AcquireLock("Lseek", fh.fh, util.SharedLock) + defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock) fh.entryLock.RLock() defer fh.entryLock.RUnlock() diff --git a/weed/mount/weedfs_file_read.go b/weed/mount/weedfs_file_read.go index 11ff07641..bf9c89071 100644 --- a/weed/mount/weedfs_file_read.go +++ b/weed/mount/weedfs_file_read.go @@ -3,6 +3,7 @@ package mount import ( "bytes" "fmt" + "github.com/seaweedfs/seaweedfs/weed/util" "io" "github.com/hanwen/go-fuse/v2/fuse" @@ -41,8 +42,8 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse return nil, fuse.ENOENT } - fh.RLock() - defer fh.RUnlock() + fhActiveLock := fh.wfs.fhLockTable.AcquireLock("Read", fh.fh, util.SharedLock) + defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock) offset := int64(in.Offset) totalRead, err := readDataByFileHandle(buff, fh, offset) diff --git a/weed/mount/weedfs_file_sync.go b/weed/mount/weedfs_file_sync.go index 4254e3830..74e16d43f 100644 --- a/weed/mount/weedfs_file_sync.go +++ b/weed/mount/weedfs_file_sync.go @@ -7,6 +7,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" "syscall" "time" ) @@ -89,8 +90,6 @@ func (wfs *WFS) Fsync(cancel <-chan struct{}, in *fuse.FsyncIn) (code fuse.Statu } func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status { - fh.Lock() - defer fh.Unlock() // flush works at fh level fileFullPath := fh.FullPath() @@ -105,6 +104,9 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status { } } + fhActiveLock := fh.wfs.fhLockTable.AcquireLock("doFlush", fh.fh, util.ExclusiveLock) + defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock) + if !fh.dirtyMetadata { return fuse.OK } diff --git a/weed/mount/weedfs_file_write.go b/weed/mount/weedfs_file_write.go index 5a9a21ded..1ec20c294 100644 --- a/weed/mount/weedfs_file_write.go +++ b/weed/mount/weedfs_file_write.go @@ -2,6 +2,7 @@ package mount import ( "github.com/hanwen/go-fuse/v2/fuse" + "github.com/seaweedfs/seaweedfs/weed/util" "net/http" "syscall" "time" @@ -48,8 +49,8 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr tsNs := time.Now().UnixNano() - fh.Lock() - defer fh.Unlock() + fhActiveLock := fh.wfs.fhLockTable.AcquireLock("Write", fh.fh, util.ExclusiveLock) + defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock) entry := fh.GetEntry() if entry == nil { diff --git a/weed/util/lock_table.go b/weed/util/lock_table.go new file mode 100644 index 000000000..015e424f9 --- /dev/null +++ b/weed/util/lock_table.go @@ -0,0 +1,147 @@ +package util + +import ( + "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" + "sync" + "sync/atomic" +) + +// LockTable is a table of locks that can be acquired. +// Locks are acquired in order of request. +type LockTable[T comparable] struct { + mu sync.Mutex + locks map[T]*LockEntry + lockIdSeq int64 +} + +type LockEntry struct { + mu sync.Mutex + waiters []*ActiveLock // ordered waiters that are blocked by exclusive locks + activeLockOwnerCount int32 + lockType LockType + cond *sync.Cond +} + +type LockType int + +const ( + SharedLock LockType = iota + ExclusiveLock +) + +type ActiveLock struct { + ID int64 + isDeleted bool + intention string // for debugging +} + +func NewLockTable[T comparable]() *LockTable[T] { + return &LockTable[T]{ + locks: make(map[T]*LockEntry), + } +} + +func (lt *LockTable[T]) NewActiveLock(intention string) *ActiveLock { + id := atomic.AddInt64(<.lockIdSeq, 1) + l := &ActiveLock{ID: id, intention: intention} + return l +} + +func (lt *LockTable[T]) AcquireLock(intention string, key T, lockType LockType) (lock *ActiveLock) { + lt.mu.Lock() + // Get or create the lock entry for the key + entry, exists := lt.locks[key] + if !exists { + entry = &LockEntry{} + entry.cond = sync.NewCond(&entry.mu) + lt.locks[key] = entry + } + lt.mu.Unlock() + + lock = lt.NewActiveLock(intention) + + // If the lock is held exclusively, wait + entry.mu.Lock() + if len(entry.waiters) > 0 || lockType == ExclusiveLock { + glog.V(4).Infof("ActiveLock %d %s wait for %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeLockOwnerCount) + if len(entry.waiters) > 0 { + for _, waiter := range entry.waiters { + fmt.Printf(" %d", waiter.ID) + } + fmt.Printf("\n") + } + entry.waiters = append(entry.waiters, lock) + if lockType == ExclusiveLock { + for !lock.isDeleted && ((len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) || entry.activeLockOwnerCount > 0) { + entry.cond.Wait() + } + } else { + for !lock.isDeleted && (len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) { + entry.cond.Wait() + } + } + // Remove the transaction from the waiters list + if len(entry.waiters) > 0 && lock.ID == entry.waiters[0].ID { + entry.waiters = entry.waiters[1:] + entry.cond.Broadcast() + } + } + entry.activeLockOwnerCount++ + + // Otherwise, grant the lock + entry.lockType = lockType + glog.V(4).Infof("ActiveLock %d %s locked %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeLockOwnerCount) + if len(entry.waiters) > 0 { + for _, waiter := range entry.waiters { + fmt.Printf(" %d", waiter.ID) + } + fmt.Printf("\n") + } + entry.mu.Unlock() + + return lock +} + +func (lt *LockTable[T]) ReleaseLock(key T, lock *ActiveLock) { + lt.mu.Lock() + defer lt.mu.Unlock() + + entry, exists := lt.locks[key] + if !exists { + return + } + + entry.mu.Lock() + defer entry.mu.Unlock() + + // Remove the transaction from the waiters list + for i, waiter := range entry.waiters { + if waiter == lock { + waiter.isDeleted = true + entry.waiters = append(entry.waiters[:i], entry.waiters[i+1:]...) + break + } + } + + // If there are no waiters, release the lock + if len(entry.waiters) == 0 { + delete(lt.locks, key) + } + + glog.V(4).Infof("ActiveLock %d %s unlocked %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, entry.lockType, len(entry.waiters), entry.activeLockOwnerCount) + if len(entry.waiters) > 0 { + for _, waiter := range entry.waiters { + fmt.Printf(" %d", waiter.ID) + } + fmt.Printf("\n") + } + entry.activeLockOwnerCount-- + + // Notify the next waiter + entry.cond.Broadcast() +} + +func main() { + +} diff --git a/weed/util/lock_table_test.go b/weed/util/lock_table_test.go new file mode 100644 index 000000000..272e5672f --- /dev/null +++ b/weed/util/lock_table_test.go @@ -0,0 +1,41 @@ +package util + +import ( + "math/rand" + "sync" + "testing" + "time" +) + +func TestOrderedLock(t *testing.T) { + lt := NewLockTable[string]() + + var wg sync.WaitGroup + // Simulate transactions requesting locks + for i := 1; i <= 50; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + key := "resource" + lockType := SharedLock + if i%5 == 0 { + lockType = ExclusiveLock + } + + // Simulate attempting to acquire the lock + lock := lt.AcquireLock("", key, lockType) + + // Lock acquired, perform some work + glog.V(4).Infof("ActiveLock %d acquired the lock.\n", lock.ID) + + // Simulate some work + time.Sleep(time.Duration(rand.Int31n(10)*10) * time.Millisecond) + + // Release the lock + lt.ReleaseLock(key, lock) + glog.V(4).Infof("ActiveLock %d released the lock.\n", lock.ID) + }(i) + } + + wg.Wait() +} |
