aboutsummaryrefslogtreecommitdiff
path: root/weed/shell
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell')
-rw-r--r--weed/shell/commands.go7
-rw-r--r--weed/shell/exclusive_locker.go100
2 files changed, 4 insertions, 103 deletions
diff --git a/weed/shell/commands.go b/weed/shell/commands.go
index b7ca5d268..f61ed9f82 100644
--- a/weed/shell/commands.go
+++ b/weed/shell/commands.go
@@ -13,6 +13,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
+ "github.com/chrislusf/seaweedfs/weed/wdclient/exclusive_locks"
)
type ShellOptions struct {
@@ -28,7 +29,7 @@ type CommandEnv struct {
env map[string]string
MasterClient *wdclient.MasterClient
option ShellOptions
- locker *ExclusiveLocker
+ locker *exclusive_locks.ExclusiveLocker
}
type command interface {
@@ -47,7 +48,7 @@ func NewCommandEnv(options ShellOptions) *CommandEnv {
MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", 0, strings.Split(*options.Masters, ",")),
option: options,
}
- ce.locker = NewExclusiveLocker(ce.MasterClient)
+ ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient)
return ce
}
@@ -70,7 +71,7 @@ func (ce *CommandEnv) isDirectory(path string) bool {
func (ce *CommandEnv) confirmIsLocked() error {
- if ce.locker.isLocking {
+ if ce.locker.IsLocking() {
return nil
}
diff --git a/weed/shell/exclusive_locker.go b/weed/shell/exclusive_locker.go
deleted file mode 100644
index ab037efb7..000000000
--- a/weed/shell/exclusive_locker.go
+++ /dev/null
@@ -1,100 +0,0 @@
-package shell
-
-import (
- "context"
- "sync/atomic"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/wdclient"
-)
-
-const (
- RenewInteval = 4 * time.Second
- SafeRenewInteval = 3 * time.Second
- InitLockInteval = 1 * time.Second
-)
-
-type ExclusiveLocker struct {
- masterClient *wdclient.MasterClient
- token int64
- lockTsNs int64
- isLocking bool
-}
-
-func NewExclusiveLocker(masterClient *wdclient.MasterClient) *ExclusiveLocker {
- return &ExclusiveLocker{
- masterClient: masterClient,
- }
-}
-
-func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) {
- for time.Unix(0, atomic.LoadInt64(&l.lockTsNs)).Add(SafeRenewInteval).Before(time.Now()) {
- // wait until now is within the safe lock period, no immediate renewal to change the token
- time.Sleep(100 * time.Millisecond)
- }
- return atomic.LoadInt64(&l.token), atomic.LoadInt64(&l.lockTsNs)
-}
-
-func (l *ExclusiveLocker) RequestLock() {
- // retry to get the lease
- for {
- if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
- resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
- PreviousToken: atomic.LoadInt64(&l.token),
- PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
- })
- if err == nil {
- atomic.StoreInt64(&l.token, resp.Token)
- atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs)
- }
- return err
- }); err != nil {
- // println("leasing problem", err.Error())
- time.Sleep(InitLockInteval)
- } else {
- break
- }
- }
-
- l.isLocking = true
-
- // start a goroutine to renew the lease
- go func() {
- for l.isLocking {
- if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
- resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
- PreviousToken: atomic.LoadInt64(&l.token),
- PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
- })
- if err == nil {
- atomic.StoreInt64(&l.token, resp.Token)
- atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs)
- // println("ts", l.lockTsNs, "token", l.token)
- }
- return err
- }); err != nil {
- glog.Errorf("failed to renew lock: %v", err)
- return
- } else {
- time.Sleep(RenewInteval)
- }
-
- }
- }()
-
-}
-
-func (l *ExclusiveLocker) ReleaseLock() {
- l.isLocking = false
- l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
- client.ReleaseAdminToken(context.Background(), &master_pb.ReleaseAdminTokenRequest{
- PreviousToken: atomic.LoadInt64(&l.token),
- PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
- })
- return nil
- })
- atomic.StoreInt64(&l.token, 0)
- atomic.StoreInt64(&l.lockTsNs, 0)
-}