diff options
Diffstat (limited to 'weed/cluster/lock_client.go')
| -rw-r--r-- | weed/cluster/lock_client.go | 80 |
1 files changed, 64 insertions, 16 deletions
diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go index c76b3c9bd..57198c865 100644 --- a/weed/cluster/lock_client.go +++ b/weed/cluster/lock_client.go @@ -16,13 +16,15 @@ type LockClient struct { grpcDialOption grpc.DialOption maxLockDuration time.Duration sleepDuration time.Duration + seedFiler pb.ServerAddress } -func NewLockClient(grpcDialOption grpc.DialOption) *LockClient { +func NewLockClient(grpcDialOption grpc.DialOption, seedFiler pb.ServerAddress) *LockClient { return &LockClient{ grpcDialOption: grpcDialOption, maxLockDuration: 5 * time.Second, - sleepDuration: 4 * time.Millisecond, + sleepDuration: 2473 * time.Millisecond, + seedFiler: seedFiler, } } @@ -34,25 +36,62 @@ type LiveLock struct { cancelCh chan struct{} grpcDialOption grpc.DialOption isLocked bool + owner string + lc *LockClient } -// NewLockWithTimeout locks the key with the given duration -func (lc *LockClient) NewLockWithTimeout(filer pb.ServerAddress, key string, lockDuration time.Duration) (lock *LiveLock) { - return lc.doNewLock(filer, key, lockDuration) +// NewLock creates a lock with a very long duration +func (lc *LockClient) NewLock(key string, owner string) (lock *LiveLock) { + return lc.doNewLock(key, lock_manager.MaxDuration, owner) } -// NewLock creates a lock with a very long duration -func (lc *LockClient) NewLock(filer pb.ServerAddress, key string) (lock *LiveLock) { - return lc.doNewLock(filer, key, lock_manager.MaxDuration) +// StartLock starts a goroutine to lock the key and returns immediately. +func (lc *LockClient) StartLock(key string, owner string) (lock *LiveLock) { + lock = &LiveLock{ + key: key, + filer: lc.seedFiler, + cancelCh: make(chan struct{}), + expireAtNs: time.Now().Add(lock_manager.MaxDuration).UnixNano(), + grpcDialOption: lc.grpcDialOption, + owner: owner, + lc: lc, + } + go func() { + util.RetryForever("create lock:"+key, func() error { + errorMessage, err := lock.doLock(lock_manager.MaxDuration) + if err != nil { + glog.Infof("create lock %s: %s", key, err) + time.Sleep(time.Second) + return err + } + if errorMessage != "" { + glog.Infof("create lock %s: %s", key, errorMessage) + time.Sleep(time.Second) + return fmt.Errorf("%v", errorMessage) + } + lock.isLocked = true + return nil + }, func(err error) (shouldContinue bool) { + if err != nil { + glog.Warningf("create lock %s: %s", key, err) + time.Sleep(time.Second) + } + return lock.renewToken == "" + }) + lc.keepLock(lock) + }() + return } -func (lc *LockClient) doNewLock(filer pb.ServerAddress, key string, lockDuration time.Duration) (lock *LiveLock) { +func (lc *LockClient) doNewLock(key string, lockDuration time.Duration, owner string) (lock *LiveLock) { lock = &LiveLock{ key: key, - filer: filer, + filer: lc.seedFiler, cancelCh: make(chan struct{}), expireAtNs: time.Now().Add(lockDuration).UnixNano(), grpcDialOption: lc.grpcDialOption, + owner: owner, + lc: lc, } var needRenewal bool if lockDuration > lc.maxLockDuration { @@ -62,11 +101,14 @@ func (lc *LockClient) doNewLock(filer pb.ServerAddress, key string, lockDuration util.RetryForever("create lock:"+key, func() error { errorMessage, err := lock.doLock(lockDuration) if err != nil { + time.Sleep(time.Second) return err } if errorMessage != "" { + time.Sleep(time.Second) return fmt.Errorf("%v", errorMessage) } + lock.isLocked = true return nil }, func(err error) (shouldContinue bool) { if err != nil { @@ -75,8 +117,6 @@ func (lc *LockClient) doNewLock(filer pb.ServerAddress, key string, lockDuration return lock.renewToken == "" }) - lock.isLocked = true - if needRenewal { go lc.keepLock(lock) } @@ -88,10 +128,13 @@ func (lock *LiveLock) IsLocked() bool { return lock.isLocked } -func (lock *LiveLock) Unlock() error { +func (lock *LiveLock) StopLock() error { close(lock.cancelCh) + if !lock.isLocked { + return nil + } return pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - _, err := client.Unlock(context.Background(), &filer_pb.UnlockRequest{ + _, err := client.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{ Name: lock.key, RenewToken: lock.renewToken, }) @@ -100,9 +143,10 @@ func (lock *LiveLock) Unlock() error { } func (lc *LockClient) keepLock(lock *LiveLock) { + ticker := time.Tick(lc.sleepDuration) for { select { - case <-time.After(lc.sleepDuration): + case <-ticker: // renew the lock if lock.expireAtNs is still greater than now util.RetryForever("keep lock:"+lock.key, func() error { lockDuration := time.Duration(lock.expireAtNs-time.Now().UnixNano()) * time.Nanosecond @@ -116,10 +160,12 @@ func (lc *LockClient) keepLock(lock *LiveLock) { errorMessage, err := lock.doLock(lockDuration) if err != nil { lock.isLocked = false + time.Sleep(time.Second) return err } if errorMessage != "" { lock.isLocked = false + time.Sleep(time.Second) return fmt.Errorf("%v", errorMessage) } return nil @@ -141,11 +187,12 @@ func (lc *LockClient) keepLock(lock *LiveLock) { func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, err error) { err = pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.Lock(context.Background(), &filer_pb.LockRequest{ + resp, err := client.DistributedLock(context.Background(), &filer_pb.LockRequest{ Name: lock.key, SecondsToLock: int64(lockDuration.Seconds()), RenewToken: lock.renewToken, IsMoved: false, + Owner: lock.owner, }) if err == nil { lock.renewToken = resp.RenewToken @@ -154,6 +201,7 @@ func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, e errorMessage = resp.Error if resp.MovedTo != "" { lock.filer = pb.ServerAddress(resp.MovedTo) + lock.lc.seedFiler = lock.filer } } return err |
