aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/exclusive_locker.go
blob: 9485b255c211aa97af7fd369f530d3f81cbd2756 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package shell

import (
	"context"
	"sync/atomic"
	"time"

	"github.com/chrislusf/seaweedfs/weed/glog"
	"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
	"github.com/chrislusf/seaweedfs/weed/wdclient"
)

const (
	RenewInteval     = 4 * time.Second
	SafeRenewInteval = 3 * time.Second
)

type ExclusiveLocker struct {
	masterClient *wdclient.MasterClient
	token        int64
	lockTsNs     int64
	isLocking    bool
}

func NewExclusiveLocker(masterClient *wdclient.MasterClient) *ExclusiveLocker {
	return &ExclusiveLocker{
		masterClient: masterClient,
	}
}

func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) {
	for time.Unix(0, atomic.LoadInt64(&l.lockTsNs)).Add(SafeRenewInteval).Before(time.Now()) {
		// wait until now is within the safe lock period, no immediate renewal to change the token
		time.Sleep(100 * time.Millisecond)
	}
	return atomic.LoadInt64(&l.token), atomic.LoadInt64(&l.lockTsNs)
}

func (l *ExclusiveLocker) Lock() {
	// retry to get the lease
	for {
		if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
			resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
				PreviousToken:    atomic.LoadInt64(&l.token),
				PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
			})
			if err == nil {
				atomic.StoreInt64(&l.token, resp.Token)
				atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs)
			}
			return err
		}); err != nil {
			time.Sleep(RenewInteval)
		} else {
			break
		}
	}

	l.isLocking = true

	// start a goroutine to renew the lease
	go func() {
		for l.isLocking {
			if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
				resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
					PreviousToken:    atomic.LoadInt64(&l.token),
					PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
				})
				if err == nil {
					atomic.StoreInt64(&l.token, resp.Token)
					atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs)
				}
				return err
			}); err != nil {
				glog.Error("failed to renew lock: %v", err)
				return
			} else {
				time.Sleep(RenewInteval)
			}

		}
	}()

}

func (l *ExclusiveLocker) Unlock() {
	l.isLocking = false
	l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
		client.ReleaseAdminToken(context.Background(), &master_pb.ReleaseAdminTokenRequest{
			PreviousToken:    atomic.LoadInt64(&l.token),
			PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
		})
		return nil
	})
}