aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/exclusive_locker.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-23 02:31:04 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-23 02:31:04 -0700
commit30ee4f3291d5f94d6d41f54915b8c3860bf0c05e (patch)
tree6f07a558d4ea0ac6eced2c1653399a979d175970 /weed/shell/exclusive_locker.go
parent77873b832be692a791c3b17b079a0d14bd317bcd (diff)
downloadseaweedfs-30ee4f3291d5f94d6d41f54915b8c3860bf0c05e.tar.xz
seaweedfs-30ee4f3291d5f94d6d41f54915b8c3860bf0c05e.zip
add exclusive lock library on shell
Diffstat (limited to 'weed/shell/exclusive_locker.go')
-rw-r--r--weed/shell/exclusive_locker.go95
1 files changed, 95 insertions, 0 deletions
diff --git a/weed/shell/exclusive_locker.go b/weed/shell/exclusive_locker.go
new file mode 100644
index 000000000..9485b255c
--- /dev/null
+++ b/weed/shell/exclusive_locker.go
@@ -0,0 +1,95 @@
+package shell
+
+import (
+ "context"
+ "sync/atomic"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+)
+
+const (
+ RenewInteval = 4 * time.Second
+ SafeRenewInteval = 3 * time.Second
+)
+
+type ExclusiveLocker struct {
+ masterClient *wdclient.MasterClient
+ token int64
+ lockTsNs int64
+ isLocking bool
+}
+
+func NewExclusiveLocker(masterClient *wdclient.MasterClient) *ExclusiveLocker {
+ return &ExclusiveLocker{
+ masterClient: masterClient,
+ }
+}
+
+func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) {
+ for time.Unix(0, atomic.LoadInt64(&l.lockTsNs)).Add(SafeRenewInteval).Before(time.Now()) {
+ // wait until now is within the safe lock period, no immediate renewal to change the token
+ time.Sleep(100 * time.Millisecond)
+ }
+ return atomic.LoadInt64(&l.token), atomic.LoadInt64(&l.lockTsNs)
+}
+
+func (l *ExclusiveLocker) Lock() {
+ // retry to get the lease
+ for {
+ if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
+ PreviousToken: atomic.LoadInt64(&l.token),
+ PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
+ })
+ if err == nil {
+ atomic.StoreInt64(&l.token, resp.Token)
+ atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs)
+ }
+ return err
+ }); err != nil {
+ time.Sleep(RenewInteval)
+ } else {
+ break
+ }
+ }
+
+ l.isLocking = true
+
+ // start a goroutine to renew the lease
+ go func() {
+ for l.isLocking {
+ if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
+ PreviousToken: atomic.LoadInt64(&l.token),
+ PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
+ })
+ if err == nil {
+ atomic.StoreInt64(&l.token, resp.Token)
+ atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs)
+ }
+ return err
+ }); err != nil {
+ glog.Error("failed to renew lock: %v", err)
+ return
+ } else {
+ time.Sleep(RenewInteval)
+ }
+
+ }
+ }()
+
+}
+
+func (l *ExclusiveLocker) Unlock() {
+ l.isLocking = false
+ l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ client.ReleaseAdminToken(context.Background(), &master_pb.ReleaseAdminTokenRequest{
+ PreviousToken: atomic.LoadInt64(&l.token),
+ PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
+ })
+ return nil
+ })
+}