diff options
Diffstat (limited to 'weed/cluster/lock_manager/lock_manager.go')
| -rw-r--r-- | weed/cluster/lock_manager/lock_manager.go | 50 |
1 files changed, 30 insertions, 20 deletions
diff --git a/weed/cluster/lock_manager/lock_manager.go b/weed/cluster/lock_manager/lock_manager.go index 01dc58810..642c77772 100644 --- a/weed/cluster/lock_manager/lock_manager.go +++ b/weed/cluster/lock_manager/lock_manager.go @@ -12,9 +12,9 @@ type LockManager struct { locks *xsync.MapOf[string, *Lock] } type Lock struct { - Token string - ExpirationNs int64 - Key string // only used for moving locks + Token string + ExpiredAtNs int64 + Key string // only used for moving locks } func NewLockManager() *LockManager { @@ -25,32 +25,37 @@ func NewLockManager() *LockManager { return t } -func (lm *LockManager) Lock(path string, ttlDuration time.Duration, token string) (renewToken string, err error) { +func (lm *LockManager) Lock(path string, expiredAtNs int64, token string) (renewToken string, err error) { lm.locks.Compute(path, func(oldValue *Lock, loaded bool) (newValue *Lock, delete bool) { if oldValue != nil { - now := time.Now() - if oldValue.ExpirationNs > 0 && oldValue.ExpirationNs < now.UnixNano() { + if oldValue.ExpiredAtNs > 0 && oldValue.ExpiredAtNs < time.Now().UnixNano() { // lock is expired, set to a new lock - expirationNs := time.Now().Add(ttlDuration).UnixNano() - return &Lock{Token: token, ExpirationNs: expirationNs}, false + if token != "" { + err = fmt.Errorf("lock: non-empty token on an expired lock") + return nil, false + } else { + // new lock + renewToken = uuid.New().String() + return &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs}, false + } } + // not expired if oldValue.Token == token { - expirationNs := time.Now().Add(ttlDuration).UnixNano() - return &Lock{Token: token, ExpirationNs: expirationNs}, false + // token matches, renew the lock + return &Lock{Token: token, ExpiredAtNs: expiredAtNs}, false } else { err = fmt.Errorf("lock: token mismatch") return oldValue, false } } else { - expirationNs := time.Now().Add(ttlDuration).UnixNano() if token == "" { + // new lock renewToken = uuid.New().String() - return &Lock{Token: renewToken, ExpirationNs: expirationNs}, false + return &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs}, false } else { err = fmt.Errorf("lock: non-empty token on a new lock") return nil, false } - return &Lock{Token: token, ExpirationNs: expirationNs}, false } }) return @@ -60,13 +65,13 @@ func (lm *LockManager) Unlock(path string, token string) (isUnlocked bool, err e lm.locks.Compute(path, func(oldValue *Lock, loaded bool) (newValue *Lock, delete bool) { if oldValue != nil { now := time.Now() - if oldValue.ExpirationNs > 0 && oldValue.ExpirationNs < now.UnixNano() { + if oldValue.ExpiredAtNs > 0 && oldValue.ExpiredAtNs < now.UnixNano() { // lock is expired, delete it isUnlocked = true return nil, true } if oldValue.Token == token { - if oldValue.ExpirationNs <= now.UnixNano() { + if oldValue.ExpiredAtNs <= now.UnixNano() { isUnlocked = true return nil, true } @@ -89,7 +94,7 @@ func (lm *LockManager) CleanUp() { time.Sleep(1 * time.Minute) now := time.Now().UnixNano() lm.locks.Range(func(key string, value *Lock) bool { - if now > value.ExpirationNs { + if now > value.ExpiredAtNs { lm.locks.Delete(key) return true } @@ -98,16 +103,16 @@ func (lm *LockManager) CleanUp() { } } -// TakeOutLocksByKey takes out locks by key +// SelectLocks takes out locks by key // if keyFn return true, the lock will be taken out -func (lm *LockManager) TakeOutLocksByKey(keyFn func(key string) bool) (locks []*Lock) { +func (lm *LockManager) SelectLocks(selectFn func(key string) bool) (locks []*Lock) { now := time.Now().UnixNano() lm.locks.Range(func(key string, lock *Lock) bool { - if now > lock.ExpirationNs { + if now > lock.ExpiredAtNs { lm.locks.Delete(key) return true } - if keyFn(key) { + if selectFn(key) { lm.locks.Delete(key) lock.Key = key locks = append(locks, lock) @@ -116,3 +121,8 @@ func (lm *LockManager) TakeOutLocksByKey(keyFn func(key string) bool) (locks []* }) return } + +// InsertLock inserts a lock unconditionally +func (lm *LockManager) InsertLock(path string, expiredAtNs int64, token string) { + lm.locks.Store(path, &Lock{Token: token, ExpiredAtNs: expiredAtNs}) +} |
