aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/exclusive_locker.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell/exclusive_locker.go')
-rw-r--r--weed/shell/exclusive_locker.go100
1 files changed, 0 insertions, 100 deletions
diff --git a/weed/shell/exclusive_locker.go b/weed/shell/exclusive_locker.go
deleted file mode 100644
index ab037efb7..000000000
--- a/weed/shell/exclusive_locker.go
+++ /dev/null
@@ -1,100 +0,0 @@
-package shell
-
-import (
- "context"
- "sync/atomic"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/wdclient"
-)
-
-const (
- RenewInteval = 4 * time.Second
- SafeRenewInteval = 3 * time.Second
- InitLockInteval = 1 * time.Second
-)
-
-type ExclusiveLocker struct {
- masterClient *wdclient.MasterClient
- token int64
- lockTsNs int64
- isLocking bool
-}
-
-func NewExclusiveLocker(masterClient *wdclient.MasterClient) *ExclusiveLocker {
- return &ExclusiveLocker{
- masterClient: masterClient,
- }
-}
-
-func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) {
- for time.Unix(0, atomic.LoadInt64(&l.lockTsNs)).Add(SafeRenewInteval).Before(time.Now()) {
- // wait until now is within the safe lock period, no immediate renewal to change the token
- time.Sleep(100 * time.Millisecond)
- }
- return atomic.LoadInt64(&l.token), atomic.LoadInt64(&l.lockTsNs)
-}
-
-func (l *ExclusiveLocker) RequestLock() {
- // retry to get the lease
- for {
- if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
- resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
- PreviousToken: atomic.LoadInt64(&l.token),
- PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
- })
- if err == nil {
- atomic.StoreInt64(&l.token, resp.Token)
- atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs)
- }
- return err
- }); err != nil {
- // println("leasing problem", err.Error())
- time.Sleep(InitLockInteval)
- } else {
- break
- }
- }
-
- l.isLocking = true
-
- // start a goroutine to renew the lease
- go func() {
- for l.isLocking {
- if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
- resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
- PreviousToken: atomic.LoadInt64(&l.token),
- PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
- })
- if err == nil {
- atomic.StoreInt64(&l.token, resp.Token)
- atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs)
- // println("ts", l.lockTsNs, "token", l.token)
- }
- return err
- }); err != nil {
- glog.Errorf("failed to renew lock: %v", err)
- return
- } else {
- time.Sleep(RenewInteval)
- }
-
- }
- }()
-
-}
-
-func (l *ExclusiveLocker) ReleaseLock() {
- l.isLocking = false
- l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
- client.ReleaseAdminToken(context.Background(), &master_pb.ReleaseAdminTokenRequest{
- PreviousToken: atomic.LoadInt64(&l.token),
- PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
- })
- return nil
- })
- atomic.StoreInt64(&l.token, 0)
- atomic.StoreInt64(&l.lockTsNs, 0)
-}