aboutsummaryrefslogtreecommitdiff
path: root/weed/cluster/lock_manager/lock_manager.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/cluster/lock_manager/lock_manager.go')
-rw-r--r--weed/cluster/lock_manager/lock_manager.go50
1 files changed, 30 insertions, 20 deletions
diff --git a/weed/cluster/lock_manager/lock_manager.go b/weed/cluster/lock_manager/lock_manager.go
index 01dc58810..642c77772 100644
--- a/weed/cluster/lock_manager/lock_manager.go
+++ b/weed/cluster/lock_manager/lock_manager.go
@@ -12,9 +12,9 @@ type LockManager struct {
locks *xsync.MapOf[string, *Lock]
}
type Lock struct {
- Token string
- ExpirationNs int64
- Key string // only used for moving locks
+ Token string
+ ExpiredAtNs int64
+ Key string // only used for moving locks
}
func NewLockManager() *LockManager {
@@ -25,32 +25,37 @@ func NewLockManager() *LockManager {
return t
}
-func (lm *LockManager) Lock(path string, ttlDuration time.Duration, token string) (renewToken string, err error) {
+func (lm *LockManager) Lock(path string, expiredAtNs int64, token string) (renewToken string, err error) {
lm.locks.Compute(path, func(oldValue *Lock, loaded bool) (newValue *Lock, delete bool) {
if oldValue != nil {
- now := time.Now()
- if oldValue.ExpirationNs > 0 && oldValue.ExpirationNs < now.UnixNano() {
+ if oldValue.ExpiredAtNs > 0 && oldValue.ExpiredAtNs < time.Now().UnixNano() {
// lock is expired, set to a new lock
- expirationNs := time.Now().Add(ttlDuration).UnixNano()
- return &Lock{Token: token, ExpirationNs: expirationNs}, false
+ if token != "" {
+ err = fmt.Errorf("lock: non-empty token on an expired lock")
+ return nil, false
+ } else {
+ // new lock
+ renewToken = uuid.New().String()
+ return &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs}, false
+ }
}
+ // not expired
if oldValue.Token == token {
- expirationNs := time.Now().Add(ttlDuration).UnixNano()
- return &Lock{Token: token, ExpirationNs: expirationNs}, false
+ // token matches, renew the lock
+ return &Lock{Token: token, ExpiredAtNs: expiredAtNs}, false
} else {
err = fmt.Errorf("lock: token mismatch")
return oldValue, false
}
} else {
- expirationNs := time.Now().Add(ttlDuration).UnixNano()
if token == "" {
+ // new lock
renewToken = uuid.New().String()
- return &Lock{Token: renewToken, ExpirationNs: expirationNs}, false
+ return &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs}, false
} else {
err = fmt.Errorf("lock: non-empty token on a new lock")
return nil, false
}
- return &Lock{Token: token, ExpirationNs: expirationNs}, false
}
})
return
@@ -60,13 +65,13 @@ func (lm *LockManager) Unlock(path string, token string) (isUnlocked bool, err e
lm.locks.Compute(path, func(oldValue *Lock, loaded bool) (newValue *Lock, delete bool) {
if oldValue != nil {
now := time.Now()
- if oldValue.ExpirationNs > 0 && oldValue.ExpirationNs < now.UnixNano() {
+ if oldValue.ExpiredAtNs > 0 && oldValue.ExpiredAtNs < now.UnixNano() {
// lock is expired, delete it
isUnlocked = true
return nil, true
}
if oldValue.Token == token {
- if oldValue.ExpirationNs <= now.UnixNano() {
+ if oldValue.ExpiredAtNs <= now.UnixNano() {
isUnlocked = true
return nil, true
}
@@ -89,7 +94,7 @@ func (lm *LockManager) CleanUp() {
time.Sleep(1 * time.Minute)
now := time.Now().UnixNano()
lm.locks.Range(func(key string, value *Lock) bool {
- if now > value.ExpirationNs {
+ if now > value.ExpiredAtNs {
lm.locks.Delete(key)
return true
}
@@ -98,16 +103,16 @@ func (lm *LockManager) CleanUp() {
}
}
-// TakeOutLocksByKey takes out locks by key
+// SelectLocks takes out locks by key
// if keyFn return true, the lock will be taken out
-func (lm *LockManager) TakeOutLocksByKey(keyFn func(key string) bool) (locks []*Lock) {
+func (lm *LockManager) SelectLocks(selectFn func(key string) bool) (locks []*Lock) {
now := time.Now().UnixNano()
lm.locks.Range(func(key string, lock *Lock) bool {
- if now > lock.ExpirationNs {
+ if now > lock.ExpiredAtNs {
lm.locks.Delete(key)
return true
}
- if keyFn(key) {
+ if selectFn(key) {
lm.locks.Delete(key)
lock.Key = key
locks = append(locks, lock)
@@ -116,3 +121,8 @@ func (lm *LockManager) TakeOutLocksByKey(keyFn func(key string) bool) (locks []*
})
return
}
+
+// InsertLock inserts a lock unconditionally
+func (lm *LockManager) InsertLock(path string, expiredAtNs int64, token string) {
+ lm.locks.Store(path, &Lock{Token: token, ExpiredAtNs: expiredAtNs})
+}