aboutsummaryrefslogtreecommitdiff
path: root/weed/wdclient
diff options
context:
space:
mode:
authorbingoohuang <bingoo.huang@gmail.com>2020-05-27 13:33:00 +0800
committerGitHub <noreply@github.com>2020-05-27 13:33:00 +0800
commit73264b952b51c5fd57e84f7e73b8667e5b09b95e (patch)
tree48395cc0e17454bd2bcd0e89a80224dde8c7aeff /weed/wdclient
parentecdeef8c66eadbf9b4e8ac7f221444d1d92a9de0 (diff)
parentef2b3a0801ba1a56f97460a591957942c194942d (diff)
downloadseaweedfs-73264b952b51c5fd57e84f7e73b8667e5b09b95e.tar.xz
seaweedfs-73264b952b51c5fd57e84f7e73b8667e5b09b95e.zip
Merge pull request #1 from chrislusf/master
Diffstat (limited to 'weed/wdclient')
-rw-r--r--weed/wdclient/exclusive_locks/exclusive_locker.go111
1 files changed, 111 insertions, 0 deletions
diff --git a/weed/wdclient/exclusive_locks/exclusive_locker.go b/weed/wdclient/exclusive_locks/exclusive_locker.go
new file mode 100644
index 000000000..67823e7f4
--- /dev/null
+++ b/weed/wdclient/exclusive_locks/exclusive_locker.go
@@ -0,0 +1,111 @@
+package exclusive_locks
+
+import (
+ "context"
+ "sync/atomic"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+)
+
+const (
+ RenewInteval = 4 * time.Second
+ SafeRenewInteval = 3 * time.Second
+ InitLockInteval = 1 * time.Second
+ AdminLockName = "admin"
+)
+
+type ExclusiveLocker struct {
+ masterClient *wdclient.MasterClient
+ token int64
+ lockTsNs int64
+ isLocking bool
+}
+
+func NewExclusiveLocker(masterClient *wdclient.MasterClient) *ExclusiveLocker {
+ return &ExclusiveLocker{
+ masterClient: masterClient,
+ }
+}
+func (l *ExclusiveLocker) IsLocking() bool {
+ return l.isLocking
+}
+
+func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) {
+ for time.Unix(0, atomic.LoadInt64(&l.lockTsNs)).Add(SafeRenewInteval).Before(time.Now()) {
+ // wait until now is within the safe lock period, no immediate renewal to change the token
+ time.Sleep(100 * time.Millisecond)
+ }
+ return atomic.LoadInt64(&l.token), atomic.LoadInt64(&l.lockTsNs)
+}
+
+func (l *ExclusiveLocker) RequestLock() {
+ if l.isLocking {
+ return
+ }
+
+ // retry to get the lease
+ for {
+ if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
+ PreviousToken: atomic.LoadInt64(&l.token),
+ PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
+ LockName: AdminLockName,
+ })
+ if err == nil {
+ atomic.StoreInt64(&l.token, resp.Token)
+ atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs)
+ }
+ return err
+ }); err != nil {
+ // println("leasing problem", err.Error())
+ time.Sleep(InitLockInteval)
+ } else {
+ break
+ }
+ }
+
+ l.isLocking = true
+
+ // start a goroutine to renew the lease
+ go func() {
+ for l.isLocking {
+ if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
+ PreviousToken: atomic.LoadInt64(&l.token),
+ PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
+ LockName: AdminLockName,
+ })
+ if err == nil {
+ atomic.StoreInt64(&l.token, resp.Token)
+ atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs)
+ // println("ts", l.lockTsNs, "token", l.token)
+ }
+ return err
+ }); err != nil {
+ glog.Errorf("failed to renew lock: %v", err)
+ return
+ } else {
+ time.Sleep(RenewInteval)
+ }
+
+ }
+ }()
+
+}
+
+func (l *ExclusiveLocker) ReleaseLock() {
+ l.isLocking = false
+ l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ client.ReleaseAdminToken(context.Background(), &master_pb.ReleaseAdminTokenRequest{
+ PreviousToken: atomic.LoadInt64(&l.token),
+ PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
+ LockName: AdminLockName,
+ })
+ return nil
+ })
+ atomic.StoreInt64(&l.token, 0)
+ atomic.StoreInt64(&l.lockTsNs, 0)
+}