diff options
| author | chrislu <chris.lu@gmail.com> | 2024-02-02 15:54:57 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-02-02 15:54:57 -0800 |
| commit | d41792461cfaae905808ec5f9a602a81f5e1cb5c (patch) | |
| tree | edcf74fe1b3752bb3a9980e163f59efac6be0a6e /weed/cluster/lock_client.go | |
| parent | d30150dde18c21c3c3af97cd935da27e2213f8cf (diff) | |
| download | seaweedfs-d41792461cfaae905808ec5f9a602a81f5e1cb5c.tar.xz seaweedfs-d41792461cfaae905808ec5f9a602a81f5e1cb5c.zip | |
lock returns host and owner
Diffstat (limited to 'weed/cluster/lock_client.go')
| -rw-r--r-- | weed/cluster/lock_client.go | 107 |
1 files changed, 49 insertions, 58 deletions
diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go index 5c17e1918..f212f9ea0 100644 --- a/weed/cluster/lock_client.go +++ b/weed/cluster/lock_client.go @@ -35,9 +35,10 @@ type LiveLock struct { filer pb.ServerAddress cancelCh chan struct{} grpcDialOption grpc.DialOption - isLocked bool - owner string - lc *LockClient + isLocked bool + self string + lc *LockClient + owner string } // NewShortLivedLock creates a lock with a 5-second duration @@ -48,27 +49,50 @@ func (lc *LockClient) NewShortLivedLock(key string, owner string) (lock *LiveLoc cancelCh: make(chan struct{}), expireAtNs: time.Now().Add(5*time.Second).UnixNano(), grpcDialOption: lc.grpcDialOption, - owner: owner, + self: owner, lc: lc, } lock.retryUntilLocked(5*time.Second) return } -// StartLock starts a goroutine to lock the key and returns immediately. -func (lc *LockClient) StartLock(key string, owner string) (lock *LiveLock) { +// StartLongLivedLock starts a goroutine to lock the key and returns immediately. +func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerChange func(newLockOwner string)) (lock *LiveLock) { lock = &LiveLock{ key: key, filer: lc.seedFiler, cancelCh: make(chan struct{}), expireAtNs: time.Now().Add(lock_manager.MaxDuration).UnixNano(), grpcDialOption: lc.grpcDialOption, - owner: owner, + self: owner, lc: lc, } go func() { - lock.retryUntilLocked(lock_manager.MaxDuration) - lc.keepLock(lock) + isLocked := false + lockOwner := "" + for { + if isLocked { + if err := lock.AttemptToLock(lock_manager.MaxDuration); err != nil { + glog.V(0).Infof("Lost lock %s: %v", key, err) + isLocked = false + } + } else { + if err := lock.AttemptToLock(lock_manager.MaxDuration); err == nil { + isLocked = true + } + } + if lockOwner != lock.LockOwner() && lock.LockOwner() != "" { + glog.V(0).Infof("Lock owner changed from %s to %s", lockOwner, lock.LockOwner()) + onLockOwnerChange(lock.LockOwner()) + lockOwner = lock.LockOwner() + } + select { + case <-lock.cancelCh: + return + default: + time.Sleep(5*time.Second) + } + } }() return } @@ -118,51 +142,6 @@ func (lock *LiveLock) StopShortLivedLock() error { }) } -func (lc *LockClient) keepLock(lock *LiveLock) { - ticker := time.Tick(lc.sleepDuration) - for { - select { - case <-ticker: - // renew the lock if lock.expireAtNs is still greater than now - util.RetryUntil("keep lock:"+lock.key, func() error { - lockDuration := time.Duration(lock.expireAtNs-time.Now().UnixNano()) * time.Nanosecond - if lockDuration > lc.maxLockDuration { - lockDuration = lc.maxLockDuration - } - if lockDuration <= 0 { - return nil - } - - errorMessage, err := lock.doLock(lockDuration) - if err != nil { - lock.isLocked = false - time.Sleep(time.Second) - glog.V(0).Infof("keep lock %s: %v", lock.key, err) - return err - } - if errorMessage != "" { - lock.isLocked = false - time.Sleep(time.Second) - glog.V(4).Infof("keep lock message %s: %v", lock.key, errorMessage) - return fmt.Errorf("keep lock error: %v", errorMessage) - } - return nil - }, func(err error) (shouldContinue bool) { - if err == nil { - return false - } - glog.Warningf("keep lock %s: %v", lock.key, err) - return true - }) - if !lock.isLocked { - return - } - case <-lock.cancelCh: - return - } - } -} - func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, err error) { err = pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.DistributedLock(context.Background(), &filer_pb.LockRequest{ @@ -170,19 +149,31 @@ func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, e SecondsToLock: int64(lockDuration.Seconds()), RenewToken: lock.renewToken, IsMoved: false, - Owner: lock.owner, + Owner: lock.self, }) - if err == nil { + if err == nil && resp != nil { lock.renewToken = resp.RenewToken + } else { + // this can be retried. Need to remember the last valid renewToken + // lock.renewToken = "" } if resp != nil { errorMessage = resp.Error - if resp.MovedTo != "" { - lock.filer = pb.ServerAddress(resp.MovedTo) + if resp.LockHostMovedTo != "" { + lock.filer = pb.ServerAddress(resp.LockHostMovedTo) lock.lc.seedFiler = lock.filer } + if resp.LockOwner != "" { + lock.owner = resp.LockOwner + } else { + lock.owner = "" + } } return err }) return } + +func (lock *LiveLock) LockOwner() string { + return lock.owner +} |
