aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2024-02-04 09:20:21 -0800
committerChris Lu <chris.lu@gmail.com>2024-02-04 09:20:21 -0800
commit0a12301b3d3eb560d8f50c459cb58e62aea7d753 (patch)
tree4236e83fc7bd2998927078c0e22ddffd4439a62f
parent1b5ba4190cd9d7b0324ba004a9cb2677310e55d1 (diff)
downloadseaweedfs-0a12301b3d3eb560d8f50c459cb58e62aea7d753.tar.xz
seaweedfs-0a12301b3d3eb560d8f50c459cb58e62aea7d753.zip
avoid too large expiration time
-rw-r--r--weed/cluster/lock_client.go28
-rw-r--r--weed/cluster/lock_manager/distributed_lock_manager.go3
-rw-r--r--weed/cluster/lock_manager/lock_manager.go169
-rw-r--r--weed/mq/broker/broker_server.go2
4 files changed, 118 insertions, 84 deletions
diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go
index f212f9ea0..82d6785a1 100644
--- a/weed/cluster/lock_client.go
+++ b/weed/cluster/lock_client.go
@@ -35,10 +35,10 @@ type LiveLock struct {
filer pb.ServerAddress
cancelCh chan struct{}
grpcDialOption grpc.DialOption
- isLocked bool
- self string
- lc *LockClient
- owner string
+ isLocked bool
+ self string
+ lc *LockClient
+ owner string
}
// NewShortLivedLock creates a lock with a 5-second duration
@@ -47,12 +47,12 @@ func (lc *LockClient) NewShortLivedLock(key string, owner string) (lock *LiveLoc
key: key,
filer: lc.seedFiler,
cancelCh: make(chan struct{}),
- expireAtNs: time.Now().Add(5*time.Second).UnixNano(),
+ expireAtNs: time.Now().Add(5 * time.Second).UnixNano(),
grpcDialOption: lc.grpcDialOption,
self: owner,
lc: lc,
}
- lock.retryUntilLocked(5*time.Second)
+ lock.retryUntilLocked(5 * time.Second)
return
}
@@ -62,7 +62,7 @@ func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerCh
key: key,
filer: lc.seedFiler,
cancelCh: make(chan struct{}),
- expireAtNs: time.Now().Add(lock_manager.MaxDuration).UnixNano(),
+ expireAtNs: time.Now().Add(lock_manager.LiveLockTTL).UnixNano(),
grpcDialOption: lc.grpcDialOption,
self: owner,
lc: lc,
@@ -72,12 +72,12 @@ func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerCh
lockOwner := ""
for {
if isLocked {
- if err := lock.AttemptToLock(lock_manager.MaxDuration); err != nil {
+ if err := lock.AttemptToLock(lock_manager.LiveLockTTL); err != nil {
glog.V(0).Infof("Lost lock %s: %v", key, err)
isLocked = false
}
} else {
- if err := lock.AttemptToLock(lock_manager.MaxDuration); err == nil {
+ if err := lock.AttemptToLock(lock_manager.LiveLockTTL); err == nil {
isLocked = true
}
}
@@ -90,7 +90,7 @@ func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerCh
case <-lock.cancelCh:
return
default:
- time.Sleep(5*time.Second)
+ time.Sleep(lock_manager.RenewInterval)
}
}
}()
@@ -111,10 +111,12 @@ func (lock *LiveLock) retryUntilLocked(lockDuration time.Duration) {
func (lock *LiveLock) AttemptToLock(lockDuration time.Duration) error {
errorMessage, err := lock.doLock(lockDuration)
if err != nil {
+ glog.Warningf("lock1 %s: %v", lock.key, err)
time.Sleep(time.Second)
return err
}
if errorMessage != "" {
+ glog.Warningf("lock2 %s: %v", lock.key, errorMessage)
time.Sleep(time.Second)
return fmt.Errorf("%v", errorMessage)
}
@@ -123,7 +125,7 @@ func (lock *LiveLock) AttemptToLock(lockDuration time.Duration) error {
}
func (lock *LiveLock) IsLocked() bool {
- return lock!=nil && lock.isLocked
+ return lock != nil && lock.isLocked
}
func (lock *LiveLock) StopShortLivedLock() error {
@@ -154,8 +156,8 @@ func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, e
if err == nil && resp != nil {
lock.renewToken = resp.RenewToken
} else {
- // this can be retried. Need to remember the last valid renewToken
- // lock.renewToken = ""
+ //this can be retried. Need to remember the last valid renewToken
+ lock.renewToken = ""
}
if resp != nil {
errorMessage = resp.Error
diff --git a/weed/cluster/lock_manager/distributed_lock_manager.go b/weed/cluster/lock_manager/distributed_lock_manager.go
index 8d7a20dbb..fe2fb5213 100644
--- a/weed/cluster/lock_manager/distributed_lock_manager.go
+++ b/weed/cluster/lock_manager/distributed_lock_manager.go
@@ -7,7 +7,8 @@ import (
"time"
)
-const MaxDuration = time.Hour * 24 * 365 * 100
+const RenewInterval = time.Second * 3
+const LiveLockTTL = time.Second * 7
var NoLockServerError = fmt.Errorf("no lock server found")
diff --git a/weed/cluster/lock_manager/lock_manager.go b/weed/cluster/lock_manager/lock_manager.go
index a619ccbbe..ebc9dfeaa 100644
--- a/weed/cluster/lock_manager/lock_manager.go
+++ b/weed/cluster/lock_manager/lock_manager.go
@@ -3,8 +3,8 @@ package lock_manager
import (
"fmt"
"github.com/google/uuid"
- "github.com/puzpuzpuz/xsync/v2"
"github.com/seaweedfs/seaweedfs/weed/glog"
+ "sync"
"time"
)
@@ -16,7 +16,8 @@ var LockNotFound = fmt.Errorf("lock not found")
// LockManager local lock manager, used by distributed lock manager
type LockManager struct {
- locks *xsync.MapOf[string, *Lock]
+ locks map[string]*Lock
+ accessLock sync.RWMutex
}
type Lock struct {
Token string
@@ -27,125 +28,155 @@ type Lock struct {
func NewLockManager() *LockManager {
t := &LockManager{
- locks: xsync.NewMapOf[*Lock](),
+ locks: make(map[string]*Lock),
}
go t.CleanUp()
return t
}
func (lm *LockManager) Lock(path string, expiredAtNs int64, token string, owner string) (lockOwner, renewToken string, err error) {
- lm.locks.Compute(path, func(oldValue *Lock, loaded bool) (newValue *Lock, delete bool) {
- if oldValue != nil {
- if oldValue.ExpiredAtNs > 0 && oldValue.ExpiredAtNs < time.Now().UnixNano() {
- // lock is expired, set to a new lock
- if token != "" {
- err = LockErrorNonEmptyTokenOnExpiredLock
- return nil, false
- } else {
- // new lock
- renewToken = uuid.New().String()
- return &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs, Owner: owner}, false
- }
- }
- // not expired
- lockOwner = oldValue.Owner
- if oldValue.Token == token {
- // token matches, renew the lock
- renewToken = uuid.New().String()
- return &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs, Owner: owner}, false
+ lm.accessLock.Lock()
+ defer lm.accessLock.Unlock()
+
+ glog.V(4).Infof("lock %s %v %v %v", path, time.Unix(0, expiredAtNs), token, owner)
+
+ if oldValue, found := lm.locks[path]; found {
+ if oldValue.ExpiredAtNs > 0 && oldValue.ExpiredAtNs < time.Now().UnixNano() {
+ // lock is expired, set to a new lock
+ if token != "" {
+ glog.V(4).Infof("lock expired key %s non-empty token %v owner %v ts %s", path, token, owner, time.Unix(0, oldValue.ExpiredAtNs))
+ err = LockErrorNonEmptyTokenOnExpiredLock
+ return
} else {
- err = LockErrorTokenMismatch
- return oldValue, false
+ // new lock
+ renewToken = uuid.New().String()
+ glog.V(4).Infof("key %s new token %v owner %v", path, renewToken, owner)
+ lm.locks[path] = &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs, Owner: owner}
+ return
}
+ }
+ // not expired
+ lockOwner = oldValue.Owner
+ if oldValue.Token == token {
+ // token matches, renew the lock
+ renewToken = uuid.New().String()
+ glog.V(4).Infof("key %s old token %v owner %v => %v owner %v", path, oldValue.Token, oldValue.Owner, renewToken, owner)
+ lm.locks[path] = &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs, Owner: owner}
+ return
} else {
if token == "" {
// new lock
- renewToken = uuid.New().String()
- return &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs, Owner: owner}, false
- } else {
- err = LockErrorNonEmptyTokenOnNewLock
- return nil, false
+ glog.V(4).Infof("key %s locked by %v", path, oldValue.Owner)
+ err = fmt.Errorf("lock already owned by %v", oldValue.Owner)
+ return
}
+ glog.V(4).Infof("key %s expected token %v owner %v received %v from %v", path, oldValue.Token, oldValue.Owner, token, owner)
+ err = fmt.Errorf("lock: token mismatch")
+ return
}
- })
- return
+ } else {
+ glog.V(4).Infof("key %s no lock owner %v", path, owner)
+ if token == "" {
+ // new lock
+ glog.V(4).Infof("key %s new token %v owner %v", path, token, owner)
+ renewToken = uuid.New().String()
+ lm.locks[path] = &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs, Owner: owner}
+ return
+ } else {
+ glog.V(4).Infof("key %s non-empty token %v owner %v", path, token, owner)
+ err = LockErrorNonEmptyTokenOnNewLock
+ return
+ }
+ }
}
func (lm *LockManager) Unlock(path string, token string) (isUnlocked bool, err error) {
- lm.locks.Compute(path, func(oldValue *Lock, loaded bool) (newValue *Lock, delete bool) {
- if oldValue != nil {
- now := time.Now()
- if oldValue.ExpiredAtNs > 0 && oldValue.ExpiredAtNs < now.UnixNano() {
- // lock is expired, delete it
- isUnlocked = true
- return nil, true
- }
- if oldValue.Token == token {
- if oldValue.ExpiredAtNs <= now.UnixNano() {
- isUnlocked = true
- return nil, true
- }
- return oldValue, false
- } else {
- isUnlocked = false
- err = UnlockErrorTokenMismatch
- return oldValue, false
- }
- } else {
+ lm.accessLock.Lock()
+ defer lm.accessLock.Unlock()
+
+ if oldValue, found := lm.locks[path]; found {
+ now := time.Now()
+ if oldValue.ExpiredAtNs > 0 && oldValue.ExpiredAtNs < now.UnixNano() {
+ // lock is expired, delete it
+ isUnlocked = true
+ glog.V(4).Infof("key %s expired at %v", path, time.Unix(0, oldValue.ExpiredAtNs))
+ delete(lm.locks, path)
+ return
+ }
+ if oldValue.Token == token {
isUnlocked = true
- return nil, true
+ glog.V(4).Infof("key %s unlocked with %v", path, token)
+ delete(lm.locks, path)
+ return
+ } else {
+ isUnlocked = false
+ err = UnlockErrorTokenMismatch
+ return
}
- })
+ }
+ err = LockNotFound
return
}
func (lm *LockManager) CleanUp() {
+
for {
time.Sleep(1 * time.Minute)
now := time.Now().UnixNano()
- lm.locks.Range(func(key string, value *Lock) bool {
+
+ lm.accessLock.Lock()
+ for key, value := range lm.locks {
if value == nil {
- return true
+ continue
}
if now > value.ExpiredAtNs {
- lm.locks.Delete(key)
- return true
+ glog.V(4).Infof("key %s expired at %v", key, time.Unix(0, value.ExpiredAtNs))
+ delete(lm.locks, key)
}
- return true
- })
+ }
+ lm.accessLock.Unlock()
}
}
// SelectLocks takes out locks by key
// if keyFn return true, the lock will be taken out
func (lm *LockManager) SelectLocks(selectFn func(key string) bool) (locks []*Lock) {
+ lm.accessLock.RLock()
+ defer lm.accessLock.RUnlock()
+
now := time.Now().UnixNano()
- lm.locks.Range(func(key string, lock *Lock) bool {
+
+ for key, lock := range lm.locks {
if now > lock.ExpiredAtNs {
- lm.locks.Delete(key)
- return true
+ glog.V(4).Infof("key %s expired at %v", key, time.Unix(0, lock.ExpiredAtNs))
+ delete(lm.locks, key)
+ continue
}
if selectFn(key) {
- lm.locks.Delete(key)
+ glog.V(4).Infof("key %s selected and deleted", key)
+ delete(lm.locks, key)
lock.Key = key
locks = append(locks, lock)
}
- return true
- })
+ }
return
}
// InsertLock inserts a lock unconditionally
func (lm *LockManager) InsertLock(path string, expiredAtNs int64, token string, owner string) {
- lm.locks.Store(path, &Lock{Token: token, ExpiredAtNs: expiredAtNs, Owner: owner})
+ lm.accessLock.Lock()
+ defer lm.accessLock.Unlock()
+
+ lm.locks[path] = &Lock{Token: token, ExpiredAtNs: expiredAtNs, Owner: owner}
}
func (lm *LockManager) GetLockOwner(key string) (owner string, err error) {
- lock, _ := lm.locks.Load(key)
- if lock != nil {
+ lm.accessLock.RLock()
+ defer lm.accessLock.RUnlock()
+
+ if lock, found := lm.locks[key]; found {
return lock.Owner, nil
}
- glog.V(0).Infof("get lock %s %+v", key, lock)
err = LockNotFound
return
}
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 3a18c3971..6a9e475d1 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -95,7 +95,7 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
for {
time.Sleep(time.Second)
- if err := mqBroker.lockAsBalancer.AttemptToLock(lock_manager.MaxDuration); err != nil {
+ if err := mqBroker.lockAsBalancer.AttemptToLock(lock_manager.RenewInterval); err != nil {
glog.V(0).Infof("AttemptToLock: %v", err)
}
}