1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
|
package cluster
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
"time"
)
type LockClient struct {
grpcDialOption grpc.DialOption
maxLockDuration time.Duration
sleepDuration time.Duration
seedFiler pb.ServerAddress
}
func NewLockClient(grpcDialOption grpc.DialOption, seedFiler pb.ServerAddress) *LockClient {
return &LockClient{
grpcDialOption: grpcDialOption,
maxLockDuration: 5 * time.Second,
sleepDuration: 2473 * time.Millisecond,
seedFiler: seedFiler,
}
}
type LiveLock struct {
key string
renewToken string
expireAtNs int64
hostFiler pb.ServerAddress
cancelCh chan struct{}
grpcDialOption grpc.DialOption
isLocked bool
self string
lc *LockClient
owner string
}
// NewShortLivedLock creates a lock with a 5-second duration
func (lc *LockClient) NewShortLivedLock(key string, owner string) (lock *LiveLock) {
lock = &LiveLock{
key: key,
hostFiler: lc.seedFiler,
cancelCh: make(chan struct{}),
expireAtNs: time.Now().Add(5 * time.Second).UnixNano(),
grpcDialOption: lc.grpcDialOption,
self: owner,
lc: lc,
}
lock.retryUntilLocked(5 * time.Second)
return
}
// StartLongLivedLock starts a goroutine to lock the key and returns immediately.
func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerChange func(newLockOwner string)) (lock *LiveLock) {
lock = &LiveLock{
key: key,
hostFiler: lc.seedFiler,
cancelCh: make(chan struct{}),
expireAtNs: time.Now().Add(lock_manager.LiveLockTTL).UnixNano(),
grpcDialOption: lc.grpcDialOption,
self: owner,
lc: lc,
}
go func() {
isLocked := false
lockOwner := ""
for {
if isLocked {
if err := lock.AttemptToLock(lock_manager.LiveLockTTL); err != nil {
log.V(3).Infof("Lost lock %s: %v", key, err)
isLocked = false
}
} else {
if err := lock.AttemptToLock(lock_manager.LiveLockTTL); err == nil {
isLocked = true
}
}
if lockOwner != lock.LockOwner() && lock.LockOwner() != "" {
log.V(3).Infof("Lock owner changed from %s to %s", lockOwner, lock.LockOwner())
onLockOwnerChange(lock.LockOwner())
lockOwner = lock.LockOwner()
}
select {
case <-lock.cancelCh:
return
default:
time.Sleep(lock_manager.RenewInterval)
}
}
}()
return
}
func (lock *LiveLock) retryUntilLocked(lockDuration time.Duration) {
util.RetryUntil("create lock:"+lock.key, func() error {
return lock.AttemptToLock(lockDuration)
}, func(err error) (shouldContinue bool) {
if err != nil {
log.Warningf("create lock %s: %s", lock.key, err)
}
return lock.renewToken == ""
})
}
func (lock *LiveLock) AttemptToLock(lockDuration time.Duration) error {
errorMessage, err := lock.doLock(lockDuration)
if err != nil {
time.Sleep(time.Second)
return err
}
if errorMessage != "" {
time.Sleep(time.Second)
return fmt.Errorf("%v", errorMessage)
}
lock.isLocked = true
return nil
}
func (lock *LiveLock) StopShortLivedLock() error {
if !lock.isLocked {
return nil
}
defer func() {
lock.isLocked = false
}()
return pb.WithFilerClient(false, 0, lock.hostFiler, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
_, err := client.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{
Name: lock.key,
RenewToken: lock.renewToken,
})
return err
})
}
func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, err error) {
err = pb.WithFilerClient(false, 0, lock.hostFiler, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.DistributedLock(context.Background(), &filer_pb.LockRequest{
Name: lock.key,
SecondsToLock: int64(lockDuration.Seconds()),
RenewToken: lock.renewToken,
IsMoved: false,
Owner: lock.self,
})
if err == nil && resp != nil {
lock.renewToken = resp.RenewToken
} else {
//this can be retried. Need to remember the last valid renewToken
lock.renewToken = ""
}
if resp != nil {
errorMessage = resp.Error
if resp.LockHostMovedTo != "" {
lock.hostFiler = pb.ServerAddress(resp.LockHostMovedTo)
lock.lc.seedFiler = lock.hostFiler
}
if resp.LockOwner != "" {
lock.owner = resp.LockOwner
// fmt.Printf("lock %s owner: %s\n", lock.key, lock.owner)
} else {
// fmt.Printf("lock %s has no owner\n", lock.key)
lock.owner = ""
}
}
return err
})
return
}
func (lock *LiveLock) LockOwner() string {
return lock.owner
}
|