aboutsummaryrefslogtreecommitdiff
path: root/weed/cluster/lock_client.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/cluster/lock_client.go')
-rw-r--r--weed/cluster/lock_client.go200
1 files changed, 84 insertions, 116 deletions
diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go
index c21f20874..6618f5d2f 100644
--- a/weed/cluster/lock_client.go
+++ b/weed/cluster/lock_client.go
@@ -32,107 +32,104 @@ type LiveLock struct {
key string
renewToken string
expireAtNs int64
- filer pb.ServerAddress
+ hostFiler pb.ServerAddress
cancelCh chan struct{}
grpcDialOption grpc.DialOption
isLocked bool
- owner string
+ self string
lc *LockClient
+ owner string
}
-// NewLock creates a lock with a very long duration
-func (lc *LockClient) NewLock(key string, owner string) (lock *LiveLock) {
- return lc.doNewLock(key, lock_manager.MaxDuration, owner)
-}
-
-// StartLock starts a goroutine to lock the key and returns immediately.
-func (lc *LockClient) StartLock(key string, owner string) (lock *LiveLock) {
+// NewShortLivedLock creates a lock with a 5-second duration
+func (lc *LockClient) NewShortLivedLock(key string, owner string) (lock *LiveLock) {
lock = &LiveLock{
key: key,
- filer: lc.seedFiler,
+ hostFiler: lc.seedFiler,
cancelCh: make(chan struct{}),
- expireAtNs: time.Now().Add(lock_manager.MaxDuration).UnixNano(),
+ expireAtNs: time.Now().Add(5 * time.Second).UnixNano(),
grpcDialOption: lc.grpcDialOption,
- owner: owner,
+ self: owner,
lc: lc,
}
- go func() {
- util.RetryUntil("create lock:"+key, func() error {
- errorMessage, err := lock.doLock(lock_manager.MaxDuration)
- if err != nil {
- glog.V(0).Infof("create lock %s: %s", key, err)
- time.Sleep(time.Second)
- return err
- }
- if errorMessage != "" {
- glog.V(4).Infof("create lock %s: %s", key, errorMessage)
- time.Sleep(time.Second)
- return fmt.Errorf("%v", errorMessage)
- }
- lock.isLocked = true
- return nil
- }, func(err error) (shouldContinue bool) {
- if err != nil {
- time.Sleep(time.Second)
- }
- return lock.renewToken == ""
- })
- lc.keepLock(lock)
- }()
+ lock.retryUntilLocked(5 * time.Second)
return
}
-func (lc *LockClient) doNewLock(key string, lockDuration time.Duration, owner string) (lock *LiveLock) {
+// StartLongLivedLock starts a goroutine to lock the key and returns immediately.
+func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerChange func(newLockOwner string)) (lock *LiveLock) {
lock = &LiveLock{
key: key,
- filer: lc.seedFiler,
+ hostFiler: lc.seedFiler,
cancelCh: make(chan struct{}),
- expireAtNs: time.Now().Add(lockDuration).UnixNano(),
+ expireAtNs: time.Now().Add(lock_manager.LiveLockTTL).UnixNano(),
grpcDialOption: lc.grpcDialOption,
- owner: owner,
+ self: owner,
lc: lc,
}
- var needRenewal bool
- if lockDuration > lc.maxLockDuration {
- lockDuration = lc.maxLockDuration
- needRenewal = true
- }
- util.RetryUntil("create lock:"+key, func() error {
- errorMessage, err := lock.doLock(lockDuration)
- if err != nil {
- time.Sleep(time.Second)
- return err
- }
- if errorMessage != "" {
- time.Sleep(time.Second)
- return fmt.Errorf("%v", errorMessage)
+ go func() {
+ isLocked := false
+ lockOwner := ""
+ for {
+ if isLocked {
+ if err := lock.AttemptToLock(lock_manager.LiveLockTTL); err != nil {
+ glog.V(0).Infof("Lost lock %s: %v", key, err)
+ isLocked = false
+ }
+ } else {
+ if err := lock.AttemptToLock(lock_manager.LiveLockTTL); err == nil {
+ isLocked = true
+ }
+ }
+ if lockOwner != lock.LockOwner() && lock.LockOwner() != "" {
+ glog.V(0).Infof("Lock owner changed from %s to %s", lockOwner, lock.LockOwner())
+ onLockOwnerChange(lock.LockOwner())
+ lockOwner = lock.LockOwner()
+ }
+ select {
+ case <-lock.cancelCh:
+ return
+ default:
+ time.Sleep(lock_manager.RenewInterval)
+ }
}
- lock.isLocked = true
- return nil
+ }()
+ return
+}
+
+func (lock *LiveLock) retryUntilLocked(lockDuration time.Duration) {
+ util.RetryUntil("create lock:"+lock.key, func() error {
+ return lock.AttemptToLock(lockDuration)
}, func(err error) (shouldContinue bool) {
if err != nil {
- glog.Warningf("create lock %s: %s", key, err)
+ glog.Warningf("create lock %s: %s", lock.key, err)
}
return lock.renewToken == ""
})
-
- if needRenewal {
- go lc.keepLock(lock)
- }
-
- return
}
-func (lock *LiveLock) IsLocked() bool {
- return lock.isLocked
+func (lock *LiveLock) AttemptToLock(lockDuration time.Duration) error {
+ errorMessage, err := lock.doLock(lockDuration)
+ if err != nil {
+ time.Sleep(time.Second)
+ return err
+ }
+ if errorMessage != "" {
+ time.Sleep(time.Second)
+ return fmt.Errorf("%v", errorMessage)
+ }
+ lock.isLocked = true
+ return nil
}
-func (lock *LiveLock) StopLock() error {
- close(lock.cancelCh)
+func (lock *LiveLock) StopShortLivedLock() error {
if !lock.isLocked {
return nil
}
- return pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ defer func() {
+ lock.isLocked = false
+ }()
+ return pb.WithFilerClient(false, 0, lock.hostFiler, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
_, err := client.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{
Name: lock.key,
RenewToken: lock.renewToken,
@@ -141,69 +138,40 @@ func (lock *LiveLock) StopLock() error {
})
}
-func (lc *LockClient) keepLock(lock *LiveLock) {
- ticker := time.Tick(lc.sleepDuration)
- for {
- select {
- case <-ticker:
- // renew the lock if lock.expireAtNs is still greater than now
- util.RetryUntil("keep lock:"+lock.key, func() error {
- lockDuration := time.Duration(lock.expireAtNs-time.Now().UnixNano()) * time.Nanosecond
- if lockDuration > lc.maxLockDuration {
- lockDuration = lc.maxLockDuration
- }
- if lockDuration <= 0 {
- return nil
- }
-
- errorMessage, err := lock.doLock(lockDuration)
- if err != nil {
- lock.isLocked = false
- time.Sleep(time.Second)
- return err
- }
- if errorMessage != "" {
- lock.isLocked = false
- time.Sleep(time.Second)
- return fmt.Errorf("%v", errorMessage)
- }
- return nil
- }, func(err error) (shouldContinue bool) {
- if err == nil {
- return false
- }
- glog.Warningf("keep lock %s: %v", lock.key, err)
- return true
- })
- if !lock.isLocked {
- return
- }
- case <-lock.cancelCh:
- return
- }
- }
-}
-
func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, err error) {
- err = pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithFilerClient(false, 0, lock.hostFiler, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.DistributedLock(context.Background(), &filer_pb.LockRequest{
Name: lock.key,
SecondsToLock: int64(lockDuration.Seconds()),
RenewToken: lock.renewToken,
IsMoved: false,
- Owner: lock.owner,
+ Owner: lock.self,
})
- if err == nil {
+ if err == nil && resp != nil {
lock.renewToken = resp.RenewToken
+ } else {
+ //this can be retried. Need to remember the last valid renewToken
+ lock.renewToken = ""
}
if resp != nil {
errorMessage = resp.Error
- if resp.MovedTo != "" {
- lock.filer = pb.ServerAddress(resp.MovedTo)
- lock.lc.seedFiler = lock.filer
+ if resp.LockHostMovedTo != "" {
+ lock.hostFiler = pb.ServerAddress(resp.LockHostMovedTo)
+ lock.lc.seedFiler = lock.hostFiler
+ }
+ if resp.LockOwner != "" {
+ lock.owner = resp.LockOwner
+ // fmt.Printf("lock %s owner: %s\n", lock.key, lock.owner)
+ } else {
+ // fmt.Printf("lock %s has no owner\n", lock.key)
+ lock.owner = ""
}
}
return err
})
return
}
+
+func (lock *LiveLock) LockOwner() string {
+ return lock.owner
+}