aboutsummaryrefslogtreecommitdiff
path: root/weed/cluster/lock_client.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/cluster/lock_client.go')
-rw-r--r--weed/cluster/lock_client.go69
1 files changed, 30 insertions, 39 deletions
diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go
index e222807e6..3d8dae8cb 100644
--- a/weed/cluster/lock_client.go
+++ b/weed/cluster/lock_client.go
@@ -57,27 +57,7 @@ func (lc *LockClient) StartLock(key string, owner string) (lock *LiveLock) {
lc: lc,
}
go func() {
- util.RetryUntil("create lock:"+key, func() error {
- errorMessage, err := lock.doLock(lock_manager.MaxDuration)
- if err != nil {
- glog.V(0).Infof("create lock %s: %s", key, err)
- time.Sleep(time.Second)
- return err
- }
- if errorMessage != "" {
- glog.V(4).Infof("create lock %s: %s", key, errorMessage)
- time.Sleep(time.Second)
- return fmt.Errorf("%v", errorMessage)
- }
- lock.isLocked = true
- return nil
- }, func(err error) (shouldContinue bool) {
- if err != nil {
- glog.Warningf("create lock %s: %s", key, err)
- time.Sleep(time.Second)
- }
- return lock.renewToken == ""
- })
+ lock.CreateLock(lock_manager.MaxDuration)
lc.keepLock(lock)
}()
return
@@ -98,30 +78,39 @@ func (lc *LockClient) doNewLock(key string, lockDuration time.Duration, owner st
lockDuration = lc.maxLockDuration
needRenewal = true
}
- util.RetryUntil("create lock:"+key, func() error {
- errorMessage, err := lock.doLock(lockDuration)
- if err != nil {
- time.Sleep(time.Second)
- return err
- }
- if errorMessage != "" {
- time.Sleep(time.Second)
- return fmt.Errorf("%v", errorMessage)
- }
- lock.isLocked = true
- return nil
+
+ lock.CreateLock(lockDuration)
+
+ if needRenewal {
+ go lc.keepLock(lock)
+ }
+
+ return
+}
+
+func (lock *LiveLock) CreateLock(lockDuration time.Duration) {
+ util.RetryUntil("create lock:"+lock.key, func() error {
+ return lock.DoLock(lockDuration)
}, func(err error) (shouldContinue bool) {
if err != nil {
- glog.Warningf("create lock %s: %s", key, err)
+ glog.Warningf("create lock %s: %s", lock.key, err)
}
return lock.renewToken == ""
})
+}
- if needRenewal {
- go lc.keepLock(lock)
+func (lock *LiveLock) DoLock(lockDuration time.Duration) error {
+ errorMessage, err := lock.doLock(lockDuration)
+ if err != nil {
+ time.Sleep(time.Second)
+ return err
}
-
- return
+ if errorMessage != "" {
+ time.Sleep(time.Second)
+ return fmt.Errorf("%v", errorMessage)
+ }
+ lock.isLocked = true
+ return nil
}
func (lock *LiveLock) IsLocked() bool {
@@ -161,12 +150,14 @@ func (lc *LockClient) keepLock(lock *LiveLock) {
if err != nil {
lock.isLocked = false
time.Sleep(time.Second)
+ glog.V(0).Infof("keep lock %s: %v", lock.key, err)
return err
}
if errorMessage != "" {
lock.isLocked = false
time.Sleep(time.Second)
- return fmt.Errorf("%v", errorMessage)
+ glog.V(4).Infof("keep lock message %s: %v", lock.key, errorMessage)
+ return fmt.Errorf("keep lock error: %v", errorMessage)
}
return nil
}, func(err error) (shouldContinue bool) {