aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/cluster/lock_client.go49
-rw-r--r--weed/mq/broker/broker_server.go4
-rw-r--r--weed/server/filer_grpc_server.go4
3 files changed, 22 insertions, 35 deletions
diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go
index 3d8dae8cb..5c17e1918 100644
--- a/weed/cluster/lock_client.go
+++ b/weed/cluster/lock_client.go
@@ -40,57 +40,42 @@ type LiveLock struct {
lc *LockClient
}
-// NewLock creates a lock with a very long duration
-func (lc *LockClient) NewLock(key string, owner string) (lock *LiveLock) {
- return lc.doNewLock(key, lock_manager.MaxDuration, owner)
-}
-
-// StartLock starts a goroutine to lock the key and returns immediately.
-func (lc *LockClient) StartLock(key string, owner string) (lock *LiveLock) {
+// NewShortLivedLock creates a lock with a 5-second duration
+func (lc *LockClient) NewShortLivedLock(key string, owner string) (lock *LiveLock) {
lock = &LiveLock{
key: key,
filer: lc.seedFiler,
cancelCh: make(chan struct{}),
- expireAtNs: time.Now().Add(lock_manager.MaxDuration).UnixNano(),
+ expireAtNs: time.Now().Add(5*time.Second).UnixNano(),
grpcDialOption: lc.grpcDialOption,
owner: owner,
lc: lc,
}
- go func() {
- lock.CreateLock(lock_manager.MaxDuration)
- lc.keepLock(lock)
- }()
+ lock.retryUntilLocked(5*time.Second)
return
}
-func (lc *LockClient) doNewLock(key string, lockDuration time.Duration, owner string) (lock *LiveLock) {
+// StartLock starts a goroutine to lock the key and returns immediately.
+func (lc *LockClient) StartLock(key string, owner string) (lock *LiveLock) {
lock = &LiveLock{
key: key,
filer: lc.seedFiler,
cancelCh: make(chan struct{}),
- expireAtNs: time.Now().Add(lockDuration).UnixNano(),
+ expireAtNs: time.Now().Add(lock_manager.MaxDuration).UnixNano(),
grpcDialOption: lc.grpcDialOption,
owner: owner,
lc: lc,
}
- var needRenewal bool
- if lockDuration > lc.maxLockDuration {
- lockDuration = lc.maxLockDuration
- needRenewal = true
- }
-
- lock.CreateLock(lockDuration)
-
- if needRenewal {
- go lc.keepLock(lock)
- }
-
+ go func() {
+ lock.retryUntilLocked(lock_manager.MaxDuration)
+ lc.keepLock(lock)
+ }()
return
}
-func (lock *LiveLock) CreateLock(lockDuration time.Duration) {
+func (lock *LiveLock) retryUntilLocked(lockDuration time.Duration) {
util.RetryUntil("create lock:"+lock.key, func() error {
- return lock.DoLock(lockDuration)
+ return lock.AttemptToLock(lockDuration)
}, func(err error) (shouldContinue bool) {
if err != nil {
glog.Warningf("create lock %s: %s", lock.key, err)
@@ -99,7 +84,7 @@ func (lock *LiveLock) CreateLock(lockDuration time.Duration) {
})
}
-func (lock *LiveLock) DoLock(lockDuration time.Duration) error {
+func (lock *LiveLock) AttemptToLock(lockDuration time.Duration) error {
errorMessage, err := lock.doLock(lockDuration)
if err != nil {
time.Sleep(time.Second)
@@ -117,11 +102,13 @@ func (lock *LiveLock) IsLocked() bool {
return lock!=nil && lock.isLocked
}
-func (lock *LiveLock) StopLock() error {
- close(lock.cancelCh)
+func (lock *LiveLock) StopShortLivedLock() error {
if !lock.isLocked {
return nil
}
+ defer func() {
+ lock.isLocked = false
+ }()
return pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
_, err := client.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{
Name: lock.key,
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index a009af693..3a25a9691 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -92,8 +92,8 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
glog.V(0).Infof("BrokerConnectToBalancer: %v", err)
}
time.Sleep(time.Second)
- if err := mqBroker.lockAsBalancer.DoLock(lock_manager.MaxDuration); err != nil {
- glog.V(0).Infof("DoLock: %v", err)
+ if err := mqBroker.lockAsBalancer.AttemptToLock(lock_manager.MaxDuration); err != nil {
+ glog.V(0).Infof("AttemptToLock: %v", err)
}
}
}()
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index f32273f26..eeb031cd1 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -245,8 +245,8 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
fullpath := util.NewFullPath(req.Directory, req.EntryName)
lockClient := cluster.NewLockClient(fs.grpcDialOption, fs.option.Host)
- lock := lockClient.NewLock(string(fullpath), string(fs.option.Host))
- defer lock.StopLock()
+ lock := lockClient.NewShortLivedLock(string(fullpath), string(fs.option.Host))
+ defer lock.StopShortLivedLock()
var offset int64 = 0
entry, err := fs.filer.FindEntry(ctx, fullpath)