diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2023-09-22 21:38:25 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-09-22 21:38:25 -0700 |
| commit | 9fc79bebf97be1c52b824fea03a431320ca097c1 (patch) | |
| tree | 9cc17ea8305abda04023e2ba32421e37ebe408a3 /weed/util/lock_table.go | |
| parent | 48cd33ca68dd411cbf0cf9cdaeb2a05fe19e9d9a (diff) | |
| parent | 248b16bc099267eb76e02f6fe45d7009942325a9 (diff) | |
| download | seaweedfs-9fc79bebf97be1c52b824fea03a431320ca097c1.tar.xz seaweedfs-9fc79bebf97be1c52b824fea03a431320ca097c1.zip | |
Merge branch 'master' into track-mount-e2e
Diffstat (limited to 'weed/util/lock_table.go')
| -rw-r--r-- | weed/util/lock_table.go | 153 |
1 files changed, 153 insertions, 0 deletions
diff --git a/weed/util/lock_table.go b/weed/util/lock_table.go new file mode 100644 index 000000000..d10b51334 --- /dev/null +++ b/weed/util/lock_table.go @@ -0,0 +1,153 @@ +package util + +import ( + "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" + "sync" + "sync/atomic" +) + +// LockTable is a table of locks that can be acquired. +// Locks are acquired in order of request. +type LockTable[T comparable] struct { + mu sync.Mutex + locks map[T]*LockEntry + lockIdSeq int64 +} + +type LockEntry struct { + mu sync.Mutex + waiters []*ActiveLock // ordered waiters that are blocked by exclusive locks + activeLockOwnerCount int32 + lockType LockType + cond *sync.Cond +} + +type LockType int + +const ( + SharedLock LockType = iota + ExclusiveLock +) + +type ActiveLock struct { + ID int64 + isDeleted bool + intention string // for debugging +} + +func NewLockTable[T comparable]() *LockTable[T] { + return &LockTable[T]{ + locks: make(map[T]*LockEntry), + } +} + +func (lt *LockTable[T]) NewActiveLock(intention string) *ActiveLock { + id := atomic.AddInt64(<.lockIdSeq, 1) + l := &ActiveLock{ID: id, intention: intention} + return l +} + +func (lt *LockTable[T]) AcquireLock(intention string, key T, lockType LockType) (lock *ActiveLock) { + lt.mu.Lock() + // Get or create the lock entry for the key + entry, exists := lt.locks[key] + if !exists { + entry = &LockEntry{} + entry.cond = sync.NewCond(&entry.mu) + lt.locks[key] = entry + } + lt.mu.Unlock() + + lock = lt.NewActiveLock(intention) + + // If the lock is held exclusively, wait + entry.mu.Lock() + if len(entry.waiters) > 0 || lockType == ExclusiveLock { + if glog.V(4) { + fmt.Printf("ActiveLock %d %s wait for %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeLockOwnerCount) + if len(entry.waiters) > 0 { + for _, waiter := range entry.waiters { + fmt.Printf(" %d", waiter.ID) + } + fmt.Printf("\n") + } + } + entry.waiters = append(entry.waiters, lock) + if lockType == ExclusiveLock { + for !lock.isDeleted && ((len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) || entry.activeLockOwnerCount > 0) { + entry.cond.Wait() + } + } else { + for !lock.isDeleted && (len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) { + entry.cond.Wait() + } + } + // Remove the transaction from the waiters list + if len(entry.waiters) > 0 && lock.ID == entry.waiters[0].ID { + entry.waiters = entry.waiters[1:] + entry.cond.Broadcast() + } + } + entry.activeLockOwnerCount++ + + // Otherwise, grant the lock + entry.lockType = lockType + if glog.V(4) { + fmt.Printf("ActiveLock %d %s locked %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeLockOwnerCount) + if len(entry.waiters) > 0 { + for _, waiter := range entry.waiters { + fmt.Printf(" %d", waiter.ID) + } + fmt.Printf("\n") + } + } + entry.mu.Unlock() + + return lock +} + +func (lt *LockTable[T]) ReleaseLock(key T, lock *ActiveLock) { + lt.mu.Lock() + defer lt.mu.Unlock() + + entry, exists := lt.locks[key] + if !exists { + return + } + + entry.mu.Lock() + defer entry.mu.Unlock() + + // Remove the transaction from the waiters list + for i, waiter := range entry.waiters { + if waiter == lock { + waiter.isDeleted = true + entry.waiters = append(entry.waiters[:i], entry.waiters[i+1:]...) + break + } + } + + // If there are no waiters, release the lock + if len(entry.waiters) == 0 { + delete(lt.locks, key) + } + + if glog.V(4) { + fmt.Printf("ActiveLock %d %s unlocked %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, entry.lockType, len(entry.waiters), entry.activeLockOwnerCount) + if len(entry.waiters) > 0 { + for _, waiter := range entry.waiters { + fmt.Printf(" %d", waiter.ID) + } + fmt.Printf("\n") + } + } + entry.activeLockOwnerCount-- + + // Notify the next waiter + entry.cond.Broadcast() +} + +func main() { + +} |
