diff options
Diffstat (limited to 'weed/shell')
| -rw-r--r-- | weed/shell/commands.go | 7 | ||||
| -rw-r--r-- | weed/shell/exclusive_locker.go | 104 |
2 files changed, 4 insertions, 107 deletions
diff --git a/weed/shell/commands.go b/weed/shell/commands.go index b7ca5d268..f61ed9f82 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -13,6 +13,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/wdclient" + "github.com/chrislusf/seaweedfs/weed/wdclient/exclusive_locks" ) type ShellOptions struct { @@ -28,7 +29,7 @@ type CommandEnv struct { env map[string]string MasterClient *wdclient.MasterClient option ShellOptions - locker *ExclusiveLocker + locker *exclusive_locks.ExclusiveLocker } type command interface { @@ -47,7 +48,7 @@ func NewCommandEnv(options ShellOptions) *CommandEnv { MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", 0, strings.Split(*options.Masters, ",")), option: options, } - ce.locker = NewExclusiveLocker(ce.MasterClient) + ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient) return ce } @@ -70,7 +71,7 @@ func (ce *CommandEnv) isDirectory(path string) bool { func (ce *CommandEnv) confirmIsLocked() error { - if ce.locker.isLocking { + if ce.locker.IsLocking() { return nil } diff --git a/weed/shell/exclusive_locker.go b/weed/shell/exclusive_locker.go deleted file mode 100644 index fa1f9ab5d..000000000 --- a/weed/shell/exclusive_locker.go +++ /dev/null @@ -1,104 +0,0 @@ -package shell - -import ( - "context" - "sync/atomic" - "time" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/wdclient" -) - -const ( - RenewInteval = 4 * time.Second - SafeRenewInteval = 3 * time.Second - InitLockInteval = 1 * time.Second - AdminLockName = "admin" -) - -type ExclusiveLocker struct { - masterClient *wdclient.MasterClient - token int64 - lockTsNs int64 - isLocking bool -} - -func NewExclusiveLocker(masterClient *wdclient.MasterClient) *ExclusiveLocker { - return &ExclusiveLocker{ - masterClient: masterClient, - } -} - -func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) { - for time.Unix(0, atomic.LoadInt64(&l.lockTsNs)).Add(SafeRenewInteval).Before(time.Now()) { - // wait until now is within the safe lock period, no immediate renewal to change the token - time.Sleep(100 * time.Millisecond) - } - return atomic.LoadInt64(&l.token), atomic.LoadInt64(&l.lockTsNs) -} - -func (l *ExclusiveLocker) RequestLock() { - // retry to get the lease - for { - if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error { - resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{ - PreviousToken: atomic.LoadInt64(&l.token), - PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), - LockName: AdminLockName, - }) - if err == nil { - atomic.StoreInt64(&l.token, resp.Token) - atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs) - } - return err - }); err != nil { - // println("leasing problem", err.Error()) - time.Sleep(InitLockInteval) - } else { - break - } - } - - l.isLocking = true - - // start a goroutine to renew the lease - go func() { - for l.isLocking { - if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error { - resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{ - PreviousToken: atomic.LoadInt64(&l.token), - PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), - LockName: AdminLockName, - }) - if err == nil { - atomic.StoreInt64(&l.token, resp.Token) - atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs) - // println("ts", l.lockTsNs, "token", l.token) - } - return err - }); err != nil { - glog.Errorf("failed to renew lock: %v", err) - return - } else { - time.Sleep(RenewInteval) - } - - } - }() - -} - -func (l *ExclusiveLocker) ReleaseLock() { - l.isLocking = false - l.masterClient.WithClient(func(client master_pb.SeaweedClient) error { - client.ReleaseAdminToken(context.Background(), &master_pb.ReleaseAdminTokenRequest{ - PreviousToken: atomic.LoadInt64(&l.token), - PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), - LockName: AdminLockName, - }) - return nil - }) - atomic.StoreInt64(&l.token, 0) - atomic.StoreInt64(&l.lockTsNs, 0) -} |
