diff options
Diffstat (limited to 'weed/cluster/lock_manager/distributed_lock_manager.go')
| -rw-r--r-- | weed/cluster/lock_manager/distributed_lock_manager.go | 30 |
1 files changed, 23 insertions, 7 deletions
diff --git a/weed/cluster/lock_manager/distributed_lock_manager.go b/weed/cluster/lock_manager/distributed_lock_manager.go index 7577fb830..7061dfd1a 100644 --- a/weed/cluster/lock_manager/distributed_lock_manager.go +++ b/weed/cluster/lock_manager/distributed_lock_manager.go @@ -6,21 +6,29 @@ import ( "time" ) +const MaxDuration = 1<<63 - 1 + var NoLockServerError = fmt.Errorf("no lock server found") type DistributedLockManager struct { lockManager *LockManager LockRing *LockRing + Host pb.ServerAddress } -func NewDistributedLockManager() *DistributedLockManager { +func NewDistributedLockManager(host pb.ServerAddress) *DistributedLockManager { return &DistributedLockManager{ lockManager: NewLockManager(), LockRing: NewLockRing(time.Second * 5), + Host: host, } } -func (dlm *DistributedLockManager) Lock(host pb.ServerAddress, key string, expiredAtNs int64, token string) (renewToken string, movedTo pb.ServerAddress, err error) { +func (dlm *DistributedLockManager) Lock(key string, token string) (renewToken string, movedTo pb.ServerAddress, err error) { + return dlm.LockWithTimeout(key, MaxDuration, token) +} + +func (dlm *DistributedLockManager) LockWithTimeout(key string, expiredAtNs int64, token string) (renewToken string, movedTo pb.ServerAddress, err error) { servers := dlm.LockRing.GetSnapshot() if servers == nil { err = NoLockServerError @@ -28,7 +36,7 @@ func (dlm *DistributedLockManager) Lock(host pb.ServerAddress, key string, expir } server := hashKeyToServer(key, servers) - if server != host { + if server != dlm.Host { movedTo = server return } @@ -36,7 +44,7 @@ func (dlm *DistributedLockManager) Lock(host pb.ServerAddress, key string, expir return } -func (dlm *DistributedLockManager) Unlock(host pb.ServerAddress, key string, token string) (movedTo pb.ServerAddress, err error) { +func (dlm *DistributedLockManager) Unlock(key string, token string) (movedTo pb.ServerAddress, err error) { servers := dlm.LockRing.GetSnapshot() if servers == nil { err = NoLockServerError @@ -44,7 +52,7 @@ func (dlm *DistributedLockManager) Unlock(host pb.ServerAddress, key string, tok } server := hashKeyToServer(key, servers) - if server != host { + if server != dlm.Host { movedTo = server return } @@ -57,12 +65,20 @@ func (dlm *DistributedLockManager) Unlock(host pb.ServerAddress, key string, tok func (dlm *DistributedLockManager) InsertLock(key string, expiredAtNs int64, token string) { dlm.lockManager.InsertLock(key, expiredAtNs, token) } -func (dlm *DistributedLockManager) SelectNotOwnedLocks(host pb.ServerAddress, servers []pb.ServerAddress) (locks []*Lock) { +func (dlm *DistributedLockManager) SelectNotOwnedLocks(servers []pb.ServerAddress) (locks []*Lock) { return dlm.lockManager.SelectLocks(func(key string) bool { server := hashKeyToServer(key, servers) - return server != host + return server != dlm.Host }) } func (dlm *DistributedLockManager) CalculateTargetServer(key string, servers []pb.ServerAddress) pb.ServerAddress { return hashKeyToServer(key, servers) } + +func (dlm *DistributedLockManager) IsLocal(key string) bool { + servers := dlm.LockRing.GetSnapshot() + if len(servers) <= 1 { + return true + } + return hashKeyToServer(key, servers) == dlm.Host +} |
