aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/wdclient/exclusive_locks/exclusive_locker.go79
1 files changed, 45 insertions, 34 deletions
diff --git a/weed/wdclient/exclusive_locks/exclusive_locker.go b/weed/wdclient/exclusive_locks/exclusive_locker.go
index 03112cb08..175718cd2 100644
--- a/weed/wdclient/exclusive_locks/exclusive_locker.go
+++ b/weed/wdclient/exclusive_locks/exclusive_locker.go
@@ -19,10 +19,13 @@ const (
type ExclusiveLocker struct {
token int64
lockTsNs int64
- isLocked bool
+ isLocked atomic.Bool
masterClient *wdclient.MasterClient
lockName string
message string
+ clientName string
+ // Each lock has and only has one goroutine
+ renewGoroutineRunning atomic.Bool
}
func NewExclusiveLocker(masterClient *wdclient.MasterClient, lockName string) *ExclusiveLocker {
@@ -33,7 +36,7 @@ func NewExclusiveLocker(masterClient *wdclient.MasterClient, lockName string) *E
}
func (l *ExclusiveLocker) IsLocked() bool {
- return l.isLocked
+ return l.isLocked.Load()
}
func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) {
@@ -45,7 +48,7 @@ func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) {
}
func (l *ExclusiveLocker) RequestLock(clientName string) {
- if l.isLocked {
+ if l.isLocked.Load() {
return
}
@@ -74,43 +77,51 @@ func (l *ExclusiveLocker) RequestLock(clientName string) {
}
}
- l.isLocked = true
-
- // start a goroutine to renew the lease
- go func() {
- ctx2, cancel2 := context.WithCancel(context.Background())
- defer cancel2()
-
- for l.isLocked {
- if err := l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
- resp, err := client.LeaseAdminToken(ctx2, &master_pb.LeaseAdminTokenRequest{
- PreviousToken: atomic.LoadInt64(&l.token),
- PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
- LockName: l.lockName,
- ClientName: clientName,
- Message: l.message,
- })
- if err == nil {
- atomic.StoreInt64(&l.token, resp.Token)
- atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs)
- // println("ts", l.lockTsNs, "token", l.token)
+ l.isLocked.Store(true)
+ l.clientName = clientName
+
+ // Each lock has and only has one goroutine
+ if l.renewGoroutineRunning.CompareAndSwap(false, true) {
+ // start a goroutine to renew the lease
+ go func() {
+ ctx2, cancel2 := context.WithCancel(context.Background())
+ defer cancel2()
+
+ for {
+ if l.isLocked.Load() {
+ if err := l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
+ resp, err := client.LeaseAdminToken(ctx2, &master_pb.LeaseAdminTokenRequest{
+ PreviousToken: atomic.LoadInt64(&l.token),
+ PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
+ LockName: l.lockName,
+ ClientName: l.clientName,
+ Message: l.message,
+ })
+ if err == nil {
+ atomic.StoreInt64(&l.token, resp.Token)
+ atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs)
+ // println("ts", l.lockTsNs, "token", l.token)
+ }
+ return err
+ }); err != nil {
+ glog.Errorf("failed to renew lock: %v", err)
+ l.isLocked.Store(false)
+ return
+ } else {
+ time.Sleep(RenewInterval)
+ }
+ } else {
+ time.Sleep(RenewInterval)
}
- return err
- }); err != nil {
- glog.Errorf("failed to renew lock: %v", err)
- l.isLocked = false
- return
- } else {
- time.Sleep(RenewInterval)
}
-
- }
- }()
+ }()
+ }
}
func (l *ExclusiveLocker) ReleaseLock() {
- l.isLocked = false
+ l.isLocked.Store(false)
+ l.clientName = ""
ctx, cancel := context.WithCancel(context.Background())
defer cancel()