aboutsummaryrefslogtreecommitdiff
path: root/weed/shell
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell')
-rw-r--r--weed/shell/commands.go5
-rw-r--r--weed/shell/exclusive_locker.go95
2 files changed, 99 insertions, 1 deletions
diff --git a/weed/shell/commands.go b/weed/shell/commands.go
index 96343971a..2faba4280 100644
--- a/weed/shell/commands.go
+++ b/weed/shell/commands.go
@@ -28,6 +28,7 @@ type CommandEnv struct {
env map[string]string
MasterClient *wdclient.MasterClient
option ShellOptions
+ locker *ExclusiveLocker
}
type command interface {
@@ -41,11 +42,13 @@ var (
)
func NewCommandEnv(options ShellOptions) *CommandEnv {
- return &CommandEnv{
+ ce := &CommandEnv{
env: make(map[string]string),
MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", 0, strings.Split(*options.Masters, ",")),
option: options,
}
+ ce.locker = NewExclusiveLocker(ce.MasterClient)
+ return ce
}
func (ce *CommandEnv) parseUrl(input string) (path string, err error) {
diff --git a/weed/shell/exclusive_locker.go b/weed/shell/exclusive_locker.go
new file mode 100644
index 000000000..9485b255c
--- /dev/null
+++ b/weed/shell/exclusive_locker.go
@@ -0,0 +1,95 @@
+package shell
+
+import (
+ "context"
+ "sync/atomic"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+)
+
+const (
+ RenewInteval = 4 * time.Second
+ SafeRenewInteval = 3 * time.Second
+)
+
+type ExclusiveLocker struct {
+ masterClient *wdclient.MasterClient
+ token int64
+ lockTsNs int64
+ isLocking bool
+}
+
+func NewExclusiveLocker(masterClient *wdclient.MasterClient) *ExclusiveLocker {
+ return &ExclusiveLocker{
+ masterClient: masterClient,
+ }
+}
+
+func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) {
+ for time.Unix(0, atomic.LoadInt64(&l.lockTsNs)).Add(SafeRenewInteval).Before(time.Now()) {
+ // wait until now is within the safe lock period, no immediate renewal to change the token
+ time.Sleep(100 * time.Millisecond)
+ }
+ return atomic.LoadInt64(&l.token), atomic.LoadInt64(&l.lockTsNs)
+}
+
+func (l *ExclusiveLocker) Lock() {
+ // retry to get the lease
+ for {
+ if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
+ PreviousToken: atomic.LoadInt64(&l.token),
+ PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
+ })
+ if err == nil {
+ atomic.StoreInt64(&l.token, resp.Token)
+ atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs)
+ }
+ return err
+ }); err != nil {
+ time.Sleep(RenewInteval)
+ } else {
+ break
+ }
+ }
+
+ l.isLocking = true
+
+ // start a goroutine to renew the lease
+ go func() {
+ for l.isLocking {
+ if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
+ PreviousToken: atomic.LoadInt64(&l.token),
+ PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
+ })
+ if err == nil {
+ atomic.StoreInt64(&l.token, resp.Token)
+ atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs)
+ }
+ return err
+ }); err != nil {
+ glog.Error("failed to renew lock: %v", err)
+ return
+ } else {
+ time.Sleep(RenewInteval)
+ }
+
+ }
+ }()
+
+}
+
+func (l *ExclusiveLocker) Unlock() {
+ l.isLocking = false
+ l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ client.ReleaseAdminToken(context.Background(), &master_pb.ReleaseAdminTokenRequest{
+ PreviousToken: atomic.LoadInt64(&l.token),
+ PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
+ })
+ return nil
+ })
+}