diff options
Diffstat (limited to 'weed/cluster/lock_client.go')
| -rw-r--r-- | weed/cluster/lock_client.go | 200 |
1 files changed, 84 insertions, 116 deletions
diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go index c21f20874..6618f5d2f 100644 --- a/weed/cluster/lock_client.go +++ b/weed/cluster/lock_client.go @@ -32,107 +32,104 @@ type LiveLock struct { key string renewToken string expireAtNs int64 - filer pb.ServerAddress + hostFiler pb.ServerAddress cancelCh chan struct{} grpcDialOption grpc.DialOption isLocked bool - owner string + self string lc *LockClient + owner string } -// NewLock creates a lock with a very long duration -func (lc *LockClient) NewLock(key string, owner string) (lock *LiveLock) { - return lc.doNewLock(key, lock_manager.MaxDuration, owner) -} - -// StartLock starts a goroutine to lock the key and returns immediately. -func (lc *LockClient) StartLock(key string, owner string) (lock *LiveLock) { +// NewShortLivedLock creates a lock with a 5-second duration +func (lc *LockClient) NewShortLivedLock(key string, owner string) (lock *LiveLock) { lock = &LiveLock{ key: key, - filer: lc.seedFiler, + hostFiler: lc.seedFiler, cancelCh: make(chan struct{}), - expireAtNs: time.Now().Add(lock_manager.MaxDuration).UnixNano(), + expireAtNs: time.Now().Add(5 * time.Second).UnixNano(), grpcDialOption: lc.grpcDialOption, - owner: owner, + self: owner, lc: lc, } - go func() { - util.RetryUntil("create lock:"+key, func() error { - errorMessage, err := lock.doLock(lock_manager.MaxDuration) - if err != nil { - glog.V(0).Infof("create lock %s: %s", key, err) - time.Sleep(time.Second) - return err - } - if errorMessage != "" { - glog.V(4).Infof("create lock %s: %s", key, errorMessage) - time.Sleep(time.Second) - return fmt.Errorf("%v", errorMessage) - } - lock.isLocked = true - return nil - }, func(err error) (shouldContinue bool) { - if err != nil { - time.Sleep(time.Second) - } - return lock.renewToken == "" - }) - lc.keepLock(lock) - }() + lock.retryUntilLocked(5 * time.Second) return } -func (lc *LockClient) doNewLock(key string, lockDuration time.Duration, owner string) (lock *LiveLock) { +// StartLongLivedLock starts a goroutine to lock the key and returns immediately. +func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerChange func(newLockOwner string)) (lock *LiveLock) { lock = &LiveLock{ key: key, - filer: lc.seedFiler, + hostFiler: lc.seedFiler, cancelCh: make(chan struct{}), - expireAtNs: time.Now().Add(lockDuration).UnixNano(), + expireAtNs: time.Now().Add(lock_manager.LiveLockTTL).UnixNano(), grpcDialOption: lc.grpcDialOption, - owner: owner, + self: owner, lc: lc, } - var needRenewal bool - if lockDuration > lc.maxLockDuration { - lockDuration = lc.maxLockDuration - needRenewal = true - } - util.RetryUntil("create lock:"+key, func() error { - errorMessage, err := lock.doLock(lockDuration) - if err != nil { - time.Sleep(time.Second) - return err - } - if errorMessage != "" { - time.Sleep(time.Second) - return fmt.Errorf("%v", errorMessage) + go func() { + isLocked := false + lockOwner := "" + for { + if isLocked { + if err := lock.AttemptToLock(lock_manager.LiveLockTTL); err != nil { + glog.V(0).Infof("Lost lock %s: %v", key, err) + isLocked = false + } + } else { + if err := lock.AttemptToLock(lock_manager.LiveLockTTL); err == nil { + isLocked = true + } + } + if lockOwner != lock.LockOwner() && lock.LockOwner() != "" { + glog.V(0).Infof("Lock owner changed from %s to %s", lockOwner, lock.LockOwner()) + onLockOwnerChange(lock.LockOwner()) + lockOwner = lock.LockOwner() + } + select { + case <-lock.cancelCh: + return + default: + time.Sleep(lock_manager.RenewInterval) + } } - lock.isLocked = true - return nil + }() + return +} + +func (lock *LiveLock) retryUntilLocked(lockDuration time.Duration) { + util.RetryUntil("create lock:"+lock.key, func() error { + return lock.AttemptToLock(lockDuration) }, func(err error) (shouldContinue bool) { if err != nil { - glog.Warningf("create lock %s: %s", key, err) + glog.Warningf("create lock %s: %s", lock.key, err) } return lock.renewToken == "" }) - - if needRenewal { - go lc.keepLock(lock) - } - - return } -func (lock *LiveLock) IsLocked() bool { - return lock.isLocked +func (lock *LiveLock) AttemptToLock(lockDuration time.Duration) error { + errorMessage, err := lock.doLock(lockDuration) + if err != nil { + time.Sleep(time.Second) + return err + } + if errorMessage != "" { + time.Sleep(time.Second) + return fmt.Errorf("%v", errorMessage) + } + lock.isLocked = true + return nil } -func (lock *LiveLock) StopLock() error { - close(lock.cancelCh) +func (lock *LiveLock) StopShortLivedLock() error { if !lock.isLocked { return nil } - return pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + defer func() { + lock.isLocked = false + }() + return pb.WithFilerClient(false, 0, lock.hostFiler, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { _, err := client.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{ Name: lock.key, RenewToken: lock.renewToken, @@ -141,69 +138,40 @@ func (lock *LiveLock) StopLock() error { }) } -func (lc *LockClient) keepLock(lock *LiveLock) { - ticker := time.Tick(lc.sleepDuration) - for { - select { - case <-ticker: - // renew the lock if lock.expireAtNs is still greater than now - util.RetryUntil("keep lock:"+lock.key, func() error { - lockDuration := time.Duration(lock.expireAtNs-time.Now().UnixNano()) * time.Nanosecond - if lockDuration > lc.maxLockDuration { - lockDuration = lc.maxLockDuration - } - if lockDuration <= 0 { - return nil - } - - errorMessage, err := lock.doLock(lockDuration) - if err != nil { - lock.isLocked = false - time.Sleep(time.Second) - return err - } - if errorMessage != "" { - lock.isLocked = false - time.Sleep(time.Second) - return fmt.Errorf("%v", errorMessage) - } - return nil - }, func(err error) (shouldContinue bool) { - if err == nil { - return false - } - glog.Warningf("keep lock %s: %v", lock.key, err) - return true - }) - if !lock.isLocked { - return - } - case <-lock.cancelCh: - return - } - } -} - func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, err error) { - err = pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithFilerClient(false, 0, lock.hostFiler, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.DistributedLock(context.Background(), &filer_pb.LockRequest{ Name: lock.key, SecondsToLock: int64(lockDuration.Seconds()), RenewToken: lock.renewToken, IsMoved: false, - Owner: lock.owner, + Owner: lock.self, }) - if err == nil { + if err == nil && resp != nil { lock.renewToken = resp.RenewToken + } else { + //this can be retried. Need to remember the last valid renewToken + lock.renewToken = "" } if resp != nil { errorMessage = resp.Error - if resp.MovedTo != "" { - lock.filer = pb.ServerAddress(resp.MovedTo) - lock.lc.seedFiler = lock.filer + if resp.LockHostMovedTo != "" { + lock.hostFiler = pb.ServerAddress(resp.LockHostMovedTo) + lock.lc.seedFiler = lock.hostFiler + } + if resp.LockOwner != "" { + lock.owner = resp.LockOwner + // fmt.Printf("lock %s owner: %s\n", lock.key, lock.owner) + } else { + // fmt.Printf("lock %s has no owner\n", lock.key) + lock.owner = "" } } return err }) return } + +func (lock *LiveLock) LockOwner() string { + return lock.owner +} |
