aboutsummaryrefslogtreecommitdiff
path: root/weed/util/lock_table.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-09-21 10:24:34 -0700
committerchrislu <chris.lu@gmail.com>2023-09-21 10:24:34 -0700
commite3b1bacf3fa0467d836ccd5ed0096146f2afbb07 (patch)
tree281a191841b28becd2e6e3c31ada35c254ab89db /weed/util/lock_table.go
parent411bdda08dc4e902246ee387ae3abe71309e4586 (diff)
downloadseaweedfs-e3b1bacf3fa0467d836ccd5ed0096146f2afbb07.tar.xz
seaweedfs-e3b1bacf3fa0467d836ccd5ed0096146f2afbb07.zip
add lock table to ensure ordered locks
Diffstat (limited to 'weed/util/lock_table.go')
-rw-r--r--weed/util/lock_table.go146
1 files changed, 146 insertions, 0 deletions
diff --git a/weed/util/lock_table.go b/weed/util/lock_table.go
new file mode 100644
index 000000000..32d98bea3
--- /dev/null
+++ b/weed/util/lock_table.go
@@ -0,0 +1,146 @@
+package util
+
+import (
+ "fmt"
+ "sync"
+ "sync/atomic"
+)
+
+// LockTable is a table of locks that can be acquired.
+// Locks are acquired in order of request.
+type LockTable[T comparable] struct {
+ mu sync.Mutex
+ locks map[T]*LockEntry
+ lockIdSeq int64
+}
+
+type LockEntry struct {
+ mu sync.Mutex
+ waiters []*ActiveLock // ordered waiters that are blocked by exclusive locks
+ activeLockOwnerCount int32
+ lockType LockType
+ cond *sync.Cond
+}
+
+type LockType int
+
+const (
+ SharedLock LockType = iota
+ ExclusiveLock
+)
+
+type ActiveLock struct {
+ ID int64
+ isDeleted bool
+ intention string // for debugging
+}
+
+func NewLockTable[T comparable]() *LockTable[T] {
+ return &LockTable[T]{
+ locks: make(map[T]*LockEntry),
+ }
+}
+
+func (lt *LockTable[T]) NewActiveLock(intention string) *ActiveLock {
+ id := atomic.AddInt64(&lt.lockIdSeq, 1)
+ l := &ActiveLock{ID: id, intention: intention}
+ return l
+}
+
+func (lt *LockTable[T]) AcquireLock(intention string, key T, lockType LockType) (lock *ActiveLock) {
+ lt.mu.Lock()
+ // Get or create the lock entry for the key
+ entry, exists := lt.locks[key]
+ if !exists {
+ entry = &LockEntry{}
+ entry.cond = sync.NewCond(&entry.mu)
+ lt.locks[key] = entry
+ }
+ lt.mu.Unlock()
+
+ lock = lt.NewActiveLock(intention)
+
+ // If the lock is held exclusively, wait
+ entry.mu.Lock()
+ if len(entry.waiters) > 0 || lockType == ExclusiveLock {
+ fmt.Printf("ActiveLock %d %s wait for %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeLockOwnerCount)
+ if len(entry.waiters) > 0 {
+ for _, waiter := range entry.waiters {
+ fmt.Printf(" %d", waiter.ID)
+ }
+ fmt.Printf("\n")
+ }
+ entry.waiters = append(entry.waiters, lock)
+ if lockType == ExclusiveLock {
+ for !lock.isDeleted && ((len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) || entry.activeLockOwnerCount > 0) {
+ entry.cond.Wait()
+ }
+ } else {
+ for !lock.isDeleted && (len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) {
+ entry.cond.Wait()
+ }
+ }
+ // Remove the transaction from the waiters list
+ if len(entry.waiters) > 0 && lock.ID == entry.waiters[0].ID {
+ entry.waiters = entry.waiters[1:]
+ entry.cond.Broadcast()
+ }
+ }
+ entry.activeLockOwnerCount++
+
+ // Otherwise, grant the lock
+ entry.lockType = lockType
+ fmt.Printf("ActiveLock %d %s locked %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeLockOwnerCount)
+ if len(entry.waiters) > 0 {
+ for _, waiter := range entry.waiters {
+ fmt.Printf(" %d", waiter.ID)
+ }
+ fmt.Printf("\n")
+ }
+ entry.mu.Unlock()
+
+ return lock
+}
+
+func (lt *LockTable[T]) ReleaseLock(key T, lock *ActiveLock) {
+ lt.mu.Lock()
+ defer lt.mu.Unlock()
+
+ entry, exists := lt.locks[key]
+ if !exists {
+ return
+ }
+
+ entry.mu.Lock()
+ defer entry.mu.Unlock()
+
+ // Remove the transaction from the waiters list
+ for i, waiter := range entry.waiters {
+ if waiter == lock {
+ waiter.isDeleted = true
+ entry.waiters = append(entry.waiters[:i], entry.waiters[i+1:]...)
+ break
+ }
+ }
+
+ // If there are no waiters, release the lock
+ if len(entry.waiters) == 0 {
+ delete(lt.locks, key)
+ }
+
+ fmt.Printf("ActiveLock %d %s unlocked %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, entry.lockType, len(entry.waiters), entry.activeLockOwnerCount)
+ if len(entry.waiters) > 0 {
+ for _, waiter := range entry.waiters {
+ fmt.Printf(" %d", waiter.ID)
+ }
+ fmt.Printf("\n")
+ }
+ entry.activeLockOwnerCount--
+
+ // Notify the next waiter
+ entry.cond.Broadcast()
+}
+
+func main() {
+
+}