diff options
| author | chrislu <chris.lu@gmail.com> | 2024-02-02 15:54:57 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-02-02 15:54:57 -0800 |
| commit | d41792461cfaae905808ec5f9a602a81f5e1cb5c (patch) | |
| tree | edcf74fe1b3752bb3a9980e163f59efac6be0a6e /weed/cluster | |
| parent | d30150dde18c21c3c3af97cd935da27e2213f8cf (diff) | |
| download | seaweedfs-d41792461cfaae905808ec5f9a602a81f5e1cb5c.tar.xz seaweedfs-d41792461cfaae905808ec5f9a602a81f5e1cb5c.zip | |
lock returns host and owner
Diffstat (limited to 'weed/cluster')
| -rw-r--r-- | weed/cluster/lock_client.go | 107 | ||||
| -rw-r--r-- | weed/cluster/lock_manager/distributed_lock_manager.go | 4 | ||||
| -rw-r--r-- | weed/cluster/lock_manager/lock_manager.go | 3 |
3 files changed, 53 insertions, 61 deletions
diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go index 5c17e1918..f212f9ea0 100644 --- a/weed/cluster/lock_client.go +++ b/weed/cluster/lock_client.go @@ -35,9 +35,10 @@ type LiveLock struct { filer pb.ServerAddress cancelCh chan struct{} grpcDialOption grpc.DialOption - isLocked bool - owner string - lc *LockClient + isLocked bool + self string + lc *LockClient + owner string } // NewShortLivedLock creates a lock with a 5-second duration @@ -48,27 +49,50 @@ func (lc *LockClient) NewShortLivedLock(key string, owner string) (lock *LiveLoc cancelCh: make(chan struct{}), expireAtNs: time.Now().Add(5*time.Second).UnixNano(), grpcDialOption: lc.grpcDialOption, - owner: owner, + self: owner, lc: lc, } lock.retryUntilLocked(5*time.Second) return } -// StartLock starts a goroutine to lock the key and returns immediately. -func (lc *LockClient) StartLock(key string, owner string) (lock *LiveLock) { +// StartLongLivedLock starts a goroutine to lock the key and returns immediately. +func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerChange func(newLockOwner string)) (lock *LiveLock) { lock = &LiveLock{ key: key, filer: lc.seedFiler, cancelCh: make(chan struct{}), expireAtNs: time.Now().Add(lock_manager.MaxDuration).UnixNano(), grpcDialOption: lc.grpcDialOption, - owner: owner, + self: owner, lc: lc, } go func() { - lock.retryUntilLocked(lock_manager.MaxDuration) - lc.keepLock(lock) + isLocked := false + lockOwner := "" + for { + if isLocked { + if err := lock.AttemptToLock(lock_manager.MaxDuration); err != nil { + glog.V(0).Infof("Lost lock %s: %v", key, err) + isLocked = false + } + } else { + if err := lock.AttemptToLock(lock_manager.MaxDuration); err == nil { + isLocked = true + } + } + if lockOwner != lock.LockOwner() && lock.LockOwner() != "" { + glog.V(0).Infof("Lock owner changed from %s to %s", lockOwner, lock.LockOwner()) + onLockOwnerChange(lock.LockOwner()) + lockOwner = lock.LockOwner() + } + select { + case <-lock.cancelCh: + return + default: + time.Sleep(5*time.Second) + } + } }() return } @@ -118,51 +142,6 @@ func (lock *LiveLock) StopShortLivedLock() error { }) } -func (lc *LockClient) keepLock(lock *LiveLock) { - ticker := time.Tick(lc.sleepDuration) - for { - select { - case <-ticker: - // renew the lock if lock.expireAtNs is still greater than now - util.RetryUntil("keep lock:"+lock.key, func() error { - lockDuration := time.Duration(lock.expireAtNs-time.Now().UnixNano()) * time.Nanosecond - if lockDuration > lc.maxLockDuration { - lockDuration = lc.maxLockDuration - } - if lockDuration <= 0 { - return nil - } - - errorMessage, err := lock.doLock(lockDuration) - if err != nil { - lock.isLocked = false - time.Sleep(time.Second) - glog.V(0).Infof("keep lock %s: %v", lock.key, err) - return err - } - if errorMessage != "" { - lock.isLocked = false - time.Sleep(time.Second) - glog.V(4).Infof("keep lock message %s: %v", lock.key, errorMessage) - return fmt.Errorf("keep lock error: %v", errorMessage) - } - return nil - }, func(err error) (shouldContinue bool) { - if err == nil { - return false - } - glog.Warningf("keep lock %s: %v", lock.key, err) - return true - }) - if !lock.isLocked { - return - } - case <-lock.cancelCh: - return - } - } -} - func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, err error) { err = pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.DistributedLock(context.Background(), &filer_pb.LockRequest{ @@ -170,19 +149,31 @@ func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, e SecondsToLock: int64(lockDuration.Seconds()), RenewToken: lock.renewToken, IsMoved: false, - Owner: lock.owner, + Owner: lock.self, }) - if err == nil { + if err == nil && resp != nil { lock.renewToken = resp.RenewToken + } else { + // this can be retried. Need to remember the last valid renewToken + // lock.renewToken = "" } if resp != nil { errorMessage = resp.Error - if resp.MovedTo != "" { - lock.filer = pb.ServerAddress(resp.MovedTo) + if resp.LockHostMovedTo != "" { + lock.filer = pb.ServerAddress(resp.LockHostMovedTo) lock.lc.seedFiler = lock.filer } + if resp.LockOwner != "" { + lock.owner = resp.LockOwner + } else { + lock.owner = "" + } } return err }) return } + +func (lock *LiveLock) LockOwner() string { + return lock.owner +} diff --git a/weed/cluster/lock_manager/distributed_lock_manager.go b/weed/cluster/lock_manager/distributed_lock_manager.go index 43e836461..6676c0a67 100644 --- a/weed/cluster/lock_manager/distributed_lock_manager.go +++ b/weed/cluster/lock_manager/distributed_lock_manager.go @@ -25,7 +25,7 @@ func NewDistributedLockManager(host pb.ServerAddress) *DistributedLockManager { } } -func (dlm *DistributedLockManager) LockWithTimeout(key string, expiredAtNs int64, token string, owner string) (renewToken string, movedTo pb.ServerAddress, err error) { +func (dlm *DistributedLockManager) LockWithTimeout(key string, expiredAtNs int64, token string, owner string) (lockOwner string, renewToken string, movedTo pb.ServerAddress, err error) { movedTo, err = dlm.findLockOwningFiler(key) if err != nil { return @@ -33,7 +33,7 @@ func (dlm *DistributedLockManager) LockWithTimeout(key string, expiredAtNs int64 if movedTo != dlm.Host { return } - renewToken, err = dlm.lockManager.Lock(key, expiredAtNs, token, owner) + lockOwner, renewToken, err = dlm.lockManager.Lock(key, expiredAtNs, token, owner) return } diff --git a/weed/cluster/lock_manager/lock_manager.go b/weed/cluster/lock_manager/lock_manager.go index acf5b93da..a619ccbbe 100644 --- a/weed/cluster/lock_manager/lock_manager.go +++ b/weed/cluster/lock_manager/lock_manager.go @@ -33,7 +33,7 @@ func NewLockManager() *LockManager { return t } -func (lm *LockManager) Lock(path string, expiredAtNs int64, token string, owner string) (renewToken string, err error) { +func (lm *LockManager) Lock(path string, expiredAtNs int64, token string, owner string) (lockOwner, renewToken string, err error) { lm.locks.Compute(path, func(oldValue *Lock, loaded bool) (newValue *Lock, delete bool) { if oldValue != nil { if oldValue.ExpiredAtNs > 0 && oldValue.ExpiredAtNs < time.Now().UnixNano() { @@ -48,6 +48,7 @@ func (lm *LockManager) Lock(path string, expiredAtNs int64, token string, owner } } // not expired + lockOwner = oldValue.Owner if oldValue.Token == token { // token matches, renew the lock renewToken = uuid.New().String() |
