diff options
Diffstat (limited to 'weed/cluster')
| -rw-r--r-- | weed/cluster/lock_client.go | 80 | ||||
| -rw-r--r-- | weed/cluster/lock_manager/distributed_lock_manager.go | 28 | ||||
| -rw-r--r-- | weed/cluster/lock_manager/lock_manager.go | 25 |
3 files changed, 101 insertions, 32 deletions
diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go index c76b3c9bd..57198c865 100644 --- a/weed/cluster/lock_client.go +++ b/weed/cluster/lock_client.go @@ -16,13 +16,15 @@ type LockClient struct { grpcDialOption grpc.DialOption maxLockDuration time.Duration sleepDuration time.Duration + seedFiler pb.ServerAddress } -func NewLockClient(grpcDialOption grpc.DialOption) *LockClient { +func NewLockClient(grpcDialOption grpc.DialOption, seedFiler pb.ServerAddress) *LockClient { return &LockClient{ grpcDialOption: grpcDialOption, maxLockDuration: 5 * time.Second, - sleepDuration: 4 * time.Millisecond, + sleepDuration: 2473 * time.Millisecond, + seedFiler: seedFiler, } } @@ -34,25 +36,62 @@ type LiveLock struct { cancelCh chan struct{} grpcDialOption grpc.DialOption isLocked bool + owner string + lc *LockClient } -// NewLockWithTimeout locks the key with the given duration -func (lc *LockClient) NewLockWithTimeout(filer pb.ServerAddress, key string, lockDuration time.Duration) (lock *LiveLock) { - return lc.doNewLock(filer, key, lockDuration) +// NewLock creates a lock with a very long duration +func (lc *LockClient) NewLock(key string, owner string) (lock *LiveLock) { + return lc.doNewLock(key, lock_manager.MaxDuration, owner) } -// NewLock creates a lock with a very long duration -func (lc *LockClient) NewLock(filer pb.ServerAddress, key string) (lock *LiveLock) { - return lc.doNewLock(filer, key, lock_manager.MaxDuration) +// StartLock starts a goroutine to lock the key and returns immediately. +func (lc *LockClient) StartLock(key string, owner string) (lock *LiveLock) { + lock = &LiveLock{ + key: key, + filer: lc.seedFiler, + cancelCh: make(chan struct{}), + expireAtNs: time.Now().Add(lock_manager.MaxDuration).UnixNano(), + grpcDialOption: lc.grpcDialOption, + owner: owner, + lc: lc, + } + go func() { + util.RetryForever("create lock:"+key, func() error { + errorMessage, err := lock.doLock(lock_manager.MaxDuration) + if err != nil { + glog.Infof("create lock %s: %s", key, err) + time.Sleep(time.Second) + return err + } + if errorMessage != "" { + glog.Infof("create lock %s: %s", key, errorMessage) + time.Sleep(time.Second) + return fmt.Errorf("%v", errorMessage) + } + lock.isLocked = true + return nil + }, func(err error) (shouldContinue bool) { + if err != nil { + glog.Warningf("create lock %s: %s", key, err) + time.Sleep(time.Second) + } + return lock.renewToken == "" + }) + lc.keepLock(lock) + }() + return } -func (lc *LockClient) doNewLock(filer pb.ServerAddress, key string, lockDuration time.Duration) (lock *LiveLock) { +func (lc *LockClient) doNewLock(key string, lockDuration time.Duration, owner string) (lock *LiveLock) { lock = &LiveLock{ key: key, - filer: filer, + filer: lc.seedFiler, cancelCh: make(chan struct{}), expireAtNs: time.Now().Add(lockDuration).UnixNano(), grpcDialOption: lc.grpcDialOption, + owner: owner, + lc: lc, } var needRenewal bool if lockDuration > lc.maxLockDuration { @@ -62,11 +101,14 @@ func (lc *LockClient) doNewLock(filer pb.ServerAddress, key string, lockDuration util.RetryForever("create lock:"+key, func() error { errorMessage, err := lock.doLock(lockDuration) if err != nil { + time.Sleep(time.Second) return err } if errorMessage != "" { + time.Sleep(time.Second) return fmt.Errorf("%v", errorMessage) } + lock.isLocked = true return nil }, func(err error) (shouldContinue bool) { if err != nil { @@ -75,8 +117,6 @@ func (lc *LockClient) doNewLock(filer pb.ServerAddress, key string, lockDuration return lock.renewToken == "" }) - lock.isLocked = true - if needRenewal { go lc.keepLock(lock) } @@ -88,10 +128,13 @@ func (lock *LiveLock) IsLocked() bool { return lock.isLocked } -func (lock *LiveLock) Unlock() error { +func (lock *LiveLock) StopLock() error { close(lock.cancelCh) + if !lock.isLocked { + return nil + } return pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - _, err := client.Unlock(context.Background(), &filer_pb.UnlockRequest{ + _, err := client.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{ Name: lock.key, RenewToken: lock.renewToken, }) @@ -100,9 +143,10 @@ func (lock *LiveLock) Unlock() error { } func (lc *LockClient) keepLock(lock *LiveLock) { + ticker := time.Tick(lc.sleepDuration) for { select { - case <-time.After(lc.sleepDuration): + case <-ticker: // renew the lock if lock.expireAtNs is still greater than now util.RetryForever("keep lock:"+lock.key, func() error { lockDuration := time.Duration(lock.expireAtNs-time.Now().UnixNano()) * time.Nanosecond @@ -116,10 +160,12 @@ func (lc *LockClient) keepLock(lock *LiveLock) { errorMessage, err := lock.doLock(lockDuration) if err != nil { lock.isLocked = false + time.Sleep(time.Second) return err } if errorMessage != "" { lock.isLocked = false + time.Sleep(time.Second) return fmt.Errorf("%v", errorMessage) } return nil @@ -141,11 +187,12 @@ func (lc *LockClient) keepLock(lock *LiveLock) { func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, err error) { err = pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.Lock(context.Background(), &filer_pb.LockRequest{ + resp, err := client.DistributedLock(context.Background(), &filer_pb.LockRequest{ Name: lock.key, SecondsToLock: int64(lockDuration.Seconds()), RenewToken: lock.renewToken, IsMoved: false, + Owner: lock.owner, }) if err == nil { lock.renewToken = resp.RenewToken @@ -154,6 +201,7 @@ func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, e errorMessage = resp.Error if resp.MovedTo != "" { lock.filer = pb.ServerAddress(resp.MovedTo) + lock.lc.seedFiler = lock.filer } } return err diff --git a/weed/cluster/lock_manager/distributed_lock_manager.go b/weed/cluster/lock_manager/distributed_lock_manager.go index 0378d02b2..9479b38b0 100644 --- a/weed/cluster/lock_manager/distributed_lock_manager.go +++ b/weed/cluster/lock_manager/distributed_lock_manager.go @@ -24,23 +24,19 @@ func NewDistributedLockManager(host pb.ServerAddress) *DistributedLockManager { } } -func (dlm *DistributedLockManager) Lock(key string, token string) (renewToken string, movedTo pb.ServerAddress, err error) { - return dlm.LockWithTimeout(key, MaxDuration, token) -} - -func (dlm *DistributedLockManager) LockWithTimeout(key string, expiredAtNs int64, token string) (renewToken string, movedTo pb.ServerAddress, err error) { - movedTo, err = dlm.FindLockOwner(key) +func (dlm *DistributedLockManager) LockWithTimeout(key string, expiredAtNs int64, token string, owner string) (renewToken string, movedTo pb.ServerAddress, err error) { + movedTo, err = dlm.findLockOwningFiler(key) if err != nil { return } if movedTo != dlm.Host { return } - renewToken, err = dlm.lockManager.Lock(key, expiredAtNs, token) + renewToken, err = dlm.lockManager.Lock(key, expiredAtNs, token, owner) return } -func (dlm *DistributedLockManager) FindLockOwner(key string) (movedTo pb.ServerAddress, err error) { +func (dlm *DistributedLockManager) findLockOwningFiler(key string) (movedTo pb.ServerAddress, err error) { servers := dlm.LockRing.GetSnapshot() if servers == nil { err = NoLockServerError @@ -51,6 +47,18 @@ func (dlm *DistributedLockManager) FindLockOwner(key string) (movedTo pb.ServerA return } +func (dlm *DistributedLockManager) FindLockOwner(key string) (owner string, movedTo pb.ServerAddress, err error) { + movedTo, err = dlm.findLockOwningFiler(key) + if err != nil { + return + } + if movedTo != dlm.Host { + return + } + owner, err = dlm.lockManager.GetLockOwner(key) + return +} + func (dlm *DistributedLockManager) Unlock(key string, token string) (movedTo pb.ServerAddress, err error) { servers := dlm.LockRing.GetSnapshot() if servers == nil { @@ -69,8 +77,8 @@ func (dlm *DistributedLockManager) Unlock(key string, token string) (movedTo pb. // InsertLock is used to insert a lock to a server unconditionally // It is used when a server is down and the lock is moved to another server -func (dlm *DistributedLockManager) InsertLock(key string, expiredAtNs int64, token string) { - dlm.lockManager.InsertLock(key, expiredAtNs, token) +func (dlm *DistributedLockManager) InsertLock(key string, expiredAtNs int64, token string, owner string) { + dlm.lockManager.InsertLock(key, expiredAtNs, token, owner) } func (dlm *DistributedLockManager) SelectNotOwnedLocks(servers []pb.ServerAddress) (locks []*Lock) { return dlm.lockManager.SelectLocks(func(key string) bool { diff --git a/weed/cluster/lock_manager/lock_manager.go b/weed/cluster/lock_manager/lock_manager.go index 6943a8084..0e3e47ba4 100644 --- a/weed/cluster/lock_manager/lock_manager.go +++ b/weed/cluster/lock_manager/lock_manager.go @@ -20,6 +20,7 @@ type Lock struct { Token string ExpiredAtNs int64 Key string // only used for moving locks + Owner string } func NewLockManager() *LockManager { @@ -30,7 +31,7 @@ func NewLockManager() *LockManager { return t } -func (lm *LockManager) Lock(path string, expiredAtNs int64, token string) (renewToken string, err error) { +func (lm *LockManager) Lock(path string, expiredAtNs int64, token string, owner string) (renewToken string, err error) { lm.locks.Compute(path, func(oldValue *Lock, loaded bool) (newValue *Lock, delete bool) { if oldValue != nil { if oldValue.ExpiredAtNs > 0 && oldValue.ExpiredAtNs < time.Now().UnixNano() { @@ -41,14 +42,14 @@ func (lm *LockManager) Lock(path string, expiredAtNs int64, token string) (renew } else { // new lock renewToken = uuid.New().String() - return &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs}, false + return &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs, Owner: owner}, false } } // not expired if oldValue.Token == token { // token matches, renew the lock renewToken = uuid.New().String() - return &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs}, false + return &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs, Owner: owner}, false } else { err = LockErrorTokenMismatch return oldValue, false @@ -57,7 +58,7 @@ func (lm *LockManager) Lock(path string, expiredAtNs int64, token string) (renew if token == "" { // new lock renewToken = uuid.New().String() - return &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs}, false + return &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs, Owner: owner}, false } else { err = LockErrorNonEmptyTokenOnNewLock return nil, false @@ -132,6 +133,18 @@ func (lm *LockManager) SelectLocks(selectFn func(key string) bool) (locks []*Loc } // InsertLock inserts a lock unconditionally -func (lm *LockManager) InsertLock(path string, expiredAtNs int64, token string) { - lm.locks.Store(path, &Lock{Token: token, ExpiredAtNs: expiredAtNs}) +func (lm *LockManager) InsertLock(path string, expiredAtNs int64, token string, owner string) { + lm.locks.Store(path, &Lock{Token: token, ExpiredAtNs: expiredAtNs, Owner: owner}) +} + +func (lm *LockManager) GetLockOwner(key string) (owner string, err error) { + lm.locks.Range(func(k string, lock *Lock) bool { + if k == key { + owner = lock.Owner + return false + } + return true + }) + return + } |
