diff options
| author | bingoohuang <bingoo.huang@gmail.com> | 2020-05-27 13:33:00 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-05-27 13:33:00 +0800 |
| commit | 73264b952b51c5fd57e84f7e73b8667e5b09b95e (patch) | |
| tree | 48395cc0e17454bd2bcd0e89a80224dde8c7aeff /weed/shell | |
| parent | ecdeef8c66eadbf9b4e8ac7f221444d1d92a9de0 (diff) | |
| parent | ef2b3a0801ba1a56f97460a591957942c194942d (diff) | |
| download | seaweedfs-73264b952b51c5fd57e84f7e73b8667e5b09b95e.tar.xz seaweedfs-73264b952b51c5fd57e84f7e73b8667e5b09b95e.zip | |
Merge pull request #1 from chrislusf/master
Diffstat (limited to 'weed/shell')
| -rw-r--r-- | weed/shell/commands.go | 7 | ||||
| -rw-r--r-- | weed/shell/exclusive_locker.go | 100 |
2 files changed, 4 insertions, 103 deletions
diff --git a/weed/shell/commands.go b/weed/shell/commands.go index b7ca5d268..f61ed9f82 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -13,6 +13,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/wdclient" + "github.com/chrislusf/seaweedfs/weed/wdclient/exclusive_locks" ) type ShellOptions struct { @@ -28,7 +29,7 @@ type CommandEnv struct { env map[string]string MasterClient *wdclient.MasterClient option ShellOptions - locker *ExclusiveLocker + locker *exclusive_locks.ExclusiveLocker } type command interface { @@ -47,7 +48,7 @@ func NewCommandEnv(options ShellOptions) *CommandEnv { MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", 0, strings.Split(*options.Masters, ",")), option: options, } - ce.locker = NewExclusiveLocker(ce.MasterClient) + ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient) return ce } @@ -70,7 +71,7 @@ func (ce *CommandEnv) isDirectory(path string) bool { func (ce *CommandEnv) confirmIsLocked() error { - if ce.locker.isLocking { + if ce.locker.IsLocking() { return nil } diff --git a/weed/shell/exclusive_locker.go b/weed/shell/exclusive_locker.go deleted file mode 100644 index ab037efb7..000000000 --- a/weed/shell/exclusive_locker.go +++ /dev/null @@ -1,100 +0,0 @@ -package shell - -import ( - "context" - "sync/atomic" - "time" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/wdclient" -) - -const ( - RenewInteval = 4 * time.Second - SafeRenewInteval = 3 * time.Second - InitLockInteval = 1 * time.Second -) - -type ExclusiveLocker struct { - masterClient *wdclient.MasterClient - token int64 - lockTsNs int64 - isLocking bool -} - -func NewExclusiveLocker(masterClient *wdclient.MasterClient) *ExclusiveLocker { - return &ExclusiveLocker{ - masterClient: masterClient, - } -} - -func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) { - for time.Unix(0, atomic.LoadInt64(&l.lockTsNs)).Add(SafeRenewInteval).Before(time.Now()) { - // wait until now is within the safe lock period, no immediate renewal to change the token - time.Sleep(100 * time.Millisecond) - } - return atomic.LoadInt64(&l.token), atomic.LoadInt64(&l.lockTsNs) -} - -func (l *ExclusiveLocker) RequestLock() { - // retry to get the lease - for { - if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error { - resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{ - PreviousToken: atomic.LoadInt64(&l.token), - PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), - }) - if err == nil { - atomic.StoreInt64(&l.token, resp.Token) - atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs) - } - return err - }); err != nil { - // println("leasing problem", err.Error()) - time.Sleep(InitLockInteval) - } else { - break - } - } - - l.isLocking = true - - // start a goroutine to renew the lease - go func() { - for l.isLocking { - if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error { - resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{ - PreviousToken: atomic.LoadInt64(&l.token), - PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), - }) - if err == nil { - atomic.StoreInt64(&l.token, resp.Token) - atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs) - // println("ts", l.lockTsNs, "token", l.token) - } - return err - }); err != nil { - glog.Errorf("failed to renew lock: %v", err) - return - } else { - time.Sleep(RenewInteval) - } - - } - }() - -} - -func (l *ExclusiveLocker) ReleaseLock() { - l.isLocking = false - l.masterClient.WithClient(func(client master_pb.SeaweedClient) error { - client.ReleaseAdminToken(context.Background(), &master_pb.ReleaseAdminTokenRequest{ - PreviousToken: atomic.LoadInt64(&l.token), - PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), - }) - return nil - }) - atomic.StoreInt64(&l.token, 0) - atomic.StoreInt64(&l.lockTsNs, 0) -} |
