diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-04-23 02:31:04 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-04-23 02:31:04 -0700 |
| commit | 30ee4f3291d5f94d6d41f54915b8c3860bf0c05e (patch) | |
| tree | 6f07a558d4ea0ac6eced2c1653399a979d175970 /weed/shell | |
| parent | 77873b832be692a791c3b17b079a0d14bd317bcd (diff) | |
| download | seaweedfs-30ee4f3291d5f94d6d41f54915b8c3860bf0c05e.tar.xz seaweedfs-30ee4f3291d5f94d6d41f54915b8c3860bf0c05e.zip | |
add exclusive lock library on shell
Diffstat (limited to 'weed/shell')
| -rw-r--r-- | weed/shell/commands.go | 5 | ||||
| -rw-r--r-- | weed/shell/exclusive_locker.go | 95 |
2 files changed, 99 insertions, 1 deletions
diff --git a/weed/shell/commands.go b/weed/shell/commands.go index 96343971a..2faba4280 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -28,6 +28,7 @@ type CommandEnv struct { env map[string]string MasterClient *wdclient.MasterClient option ShellOptions + locker *ExclusiveLocker } type command interface { @@ -41,11 +42,13 @@ var ( ) func NewCommandEnv(options ShellOptions) *CommandEnv { - return &CommandEnv{ + ce := &CommandEnv{ env: make(map[string]string), MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", 0, strings.Split(*options.Masters, ",")), option: options, } + ce.locker = NewExclusiveLocker(ce.MasterClient) + return ce } func (ce *CommandEnv) parseUrl(input string) (path string, err error) { diff --git a/weed/shell/exclusive_locker.go b/weed/shell/exclusive_locker.go new file mode 100644 index 000000000..9485b255c --- /dev/null +++ b/weed/shell/exclusive_locker.go @@ -0,0 +1,95 @@ +package shell + +import ( + "context" + "sync/atomic" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/wdclient" +) + +const ( + RenewInteval = 4 * time.Second + SafeRenewInteval = 3 * time.Second +) + +type ExclusiveLocker struct { + masterClient *wdclient.MasterClient + token int64 + lockTsNs int64 + isLocking bool +} + +func NewExclusiveLocker(masterClient *wdclient.MasterClient) *ExclusiveLocker { + return &ExclusiveLocker{ + masterClient: masterClient, + } +} + +func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) { + for time.Unix(0, atomic.LoadInt64(&l.lockTsNs)).Add(SafeRenewInteval).Before(time.Now()) { + // wait until now is within the safe lock period, no immediate renewal to change the token + time.Sleep(100 * time.Millisecond) + } + return atomic.LoadInt64(&l.token), atomic.LoadInt64(&l.lockTsNs) +} + +func (l *ExclusiveLocker) Lock() { + // retry to get the lease + for { + if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{ + PreviousToken: atomic.LoadInt64(&l.token), + PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), + }) + if err == nil { + atomic.StoreInt64(&l.token, resp.Token) + atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs) + } + return err + }); err != nil { + time.Sleep(RenewInteval) + } else { + break + } + } + + l.isLocking = true + + // start a goroutine to renew the lease + go func() { + for l.isLocking { + if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{ + PreviousToken: atomic.LoadInt64(&l.token), + PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), + }) + if err == nil { + atomic.StoreInt64(&l.token, resp.Token) + atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs) + } + return err + }); err != nil { + glog.Error("failed to renew lock: %v", err) + return + } else { + time.Sleep(RenewInteval) + } + + } + }() + +} + +func (l *ExclusiveLocker) Unlock() { + l.isLocking = false + l.masterClient.WithClient(func(client master_pb.SeaweedClient) error { + client.ReleaseAdminToken(context.Background(), &master_pb.ReleaseAdminTokenRequest{ + PreviousToken: atomic.LoadInt64(&l.token), + PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), + }) + return nil + }) +} |
