aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-29 22:46:23 -0800
committerchrislu <chris.lu@gmail.com>2024-01-29 22:46:23 -0800
commitac50d8a8226bcdef78220003d3cade708ce1eb1e (patch)
tree45dcab7750ae04db365c4e798b7d29c412cdeafe
parent0b2e5ddc7ca6e9a75c50fd2c1c2eebd899af3fc4 (diff)
downloadseaweedfs-ac50d8a8226bcdef78220003d3cade708ce1eb1e.tar.xz
seaweedfs-ac50d8a8226bcdef78220003d3cade708ce1eb1e.zip
re-lock if the lock owner is not found
-rw-r--r--weed/cluster/lock_client.go69
-rw-r--r--weed/cluster/lock_manager/lock_manager.go16
-rw-r--r--weed/mq/broker/broker_server.go10
-rw-r--r--weed/server/filer_grpc_server_dlm.go18
4 files changed, 55 insertions, 58 deletions
diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go
index e222807e6..3d8dae8cb 100644
--- a/weed/cluster/lock_client.go
+++ b/weed/cluster/lock_client.go
@@ -57,27 +57,7 @@ func (lc *LockClient) StartLock(key string, owner string) (lock *LiveLock) {
lc: lc,
}
go func() {
- util.RetryUntil("create lock:"+key, func() error {
- errorMessage, err := lock.doLock(lock_manager.MaxDuration)
- if err != nil {
- glog.V(0).Infof("create lock %s: %s", key, err)
- time.Sleep(time.Second)
- return err
- }
- if errorMessage != "" {
- glog.V(4).Infof("create lock %s: %s", key, errorMessage)
- time.Sleep(time.Second)
- return fmt.Errorf("%v", errorMessage)
- }
- lock.isLocked = true
- return nil
- }, func(err error) (shouldContinue bool) {
- if err != nil {
- glog.Warningf("create lock %s: %s", key, err)
- time.Sleep(time.Second)
- }
- return lock.renewToken == ""
- })
+ lock.CreateLock(lock_manager.MaxDuration)
lc.keepLock(lock)
}()
return
@@ -98,30 +78,39 @@ func (lc *LockClient) doNewLock(key string, lockDuration time.Duration, owner st
lockDuration = lc.maxLockDuration
needRenewal = true
}
- util.RetryUntil("create lock:"+key, func() error {
- errorMessage, err := lock.doLock(lockDuration)
- if err != nil {
- time.Sleep(time.Second)
- return err
- }
- if errorMessage != "" {
- time.Sleep(time.Second)
- return fmt.Errorf("%v", errorMessage)
- }
- lock.isLocked = true
- return nil
+
+ lock.CreateLock(lockDuration)
+
+ if needRenewal {
+ go lc.keepLock(lock)
+ }
+
+ return
+}
+
+func (lock *LiveLock) CreateLock(lockDuration time.Duration) {
+ util.RetryUntil("create lock:"+lock.key, func() error {
+ return lock.DoLock(lockDuration)
}, func(err error) (shouldContinue bool) {
if err != nil {
- glog.Warningf("create lock %s: %s", key, err)
+ glog.Warningf("create lock %s: %s", lock.key, err)
}
return lock.renewToken == ""
})
+}
- if needRenewal {
- go lc.keepLock(lock)
+func (lock *LiveLock) DoLock(lockDuration time.Duration) error {
+ errorMessage, err := lock.doLock(lockDuration)
+ if err != nil {
+ time.Sleep(time.Second)
+ return err
}
-
- return
+ if errorMessage != "" {
+ time.Sleep(time.Second)
+ return fmt.Errorf("%v", errorMessage)
+ }
+ lock.isLocked = true
+ return nil
}
func (lock *LiveLock) IsLocked() bool {
@@ -161,12 +150,14 @@ func (lc *LockClient) keepLock(lock *LiveLock) {
if err != nil {
lock.isLocked = false
time.Sleep(time.Second)
+ glog.V(0).Infof("keep lock %s: %v", lock.key, err)
return err
}
if errorMessage != "" {
lock.isLocked = false
time.Sleep(time.Second)
- return fmt.Errorf("%v", errorMessage)
+ glog.V(4).Infof("keep lock message %s: %v", lock.key, errorMessage)
+ return fmt.Errorf("keep lock error: %v", errorMessage)
}
return nil
}, func(err error) (shouldContinue bool) {
diff --git a/weed/cluster/lock_manager/lock_manager.go b/weed/cluster/lock_manager/lock_manager.go
index 49b951dd9..acf5b93da 100644
--- a/weed/cluster/lock_manager/lock_manager.go
+++ b/weed/cluster/lock_manager/lock_manager.go
@@ -4,6 +4,7 @@ import (
"fmt"
"github.com/google/uuid"
"github.com/puzpuzpuz/xsync/v2"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"time"
)
@@ -11,6 +12,7 @@ var LockErrorNonEmptyTokenOnNewLock = fmt.Errorf("lock: non-empty token on a new
var LockErrorNonEmptyTokenOnExpiredLock = fmt.Errorf("lock: non-empty token on an expired lock")
var LockErrorTokenMismatch = fmt.Errorf("lock: token mismatch")
var UnlockErrorTokenMismatch = fmt.Errorf("unlock: token mismatch")
+var LockNotFound = fmt.Errorf("lock not found")
// LockManager local lock manager, used by distributed lock manager
type LockManager struct {
@@ -138,13 +140,11 @@ func (lm *LockManager) InsertLock(path string, expiredAtNs int64, token string,
}
func (lm *LockManager) GetLockOwner(key string) (owner string, err error) {
- lm.locks.Range(func(k string, lock *Lock) bool {
- if k == key && lock != nil {
- owner = lock.Owner
- return false
- }
- return true
- })
+ lock, _ := lm.locks.Load(key)
+ if lock != nil {
+ return lock.Owner, nil
+ }
+ glog.V(0).Infof("get lock %s %+v", key, lock)
+ err = LockNotFound
return
-
}
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index f41ec87ca..a009af693 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -1,7 +1,7 @@
package broker
import (
- "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
@@ -88,11 +88,13 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler)
mqBroker.lockAsBalancer = lockClient.StartLock(pub_balancer.LockBrokerBalancer, string(self))
for {
- err := mqBroker.BrokerConnectToBalancer(string(self))
- if err != nil {
- fmt.Printf("BrokerConnectToBalancer: %v\n", err)
+ if err := mqBroker.BrokerConnectToBalancer(string(self)); err != nil {
+ glog.V(0).Infof("BrokerConnectToBalancer: %v", err)
}
time.Sleep(time.Second)
+ if err := mqBroker.lockAsBalancer.DoLock(lock_manager.MaxDuration); err != nil {
+ glog.V(0).Infof("DoLock: %v", err)
+ }
}
}()
diff --git a/weed/server/filer_grpc_server_dlm.go b/weed/server/filer_grpc_server_dlm.go
index 0d7c801f5..a7cef4032 100644
--- a/weed/server/filer_grpc_server_dlm.go
+++ b/weed/server/filer_grpc_server_dlm.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -81,13 +82,7 @@ func (fs *FilerServer) DistributedUnlock(ctx context.Context, req *filer_pb.Unlo
func (fs *FilerServer) FindLockOwner(ctx context.Context, req *filer_pb.FindLockOwnerRequest) (*filer_pb.FindLockOwnerResponse, error) {
owner, movedTo, err := fs.filer.Dlm.FindLockOwner(req.Name)
- if owner == "" {
- glog.V(0).Infof("find lock %s moved to %v: %v", req.Name, movedTo, err)
- }
- if err != nil {
- return nil, status.Error(codes.Internal, err.Error())
- }
- if !req.IsMoved && movedTo != "" {
+ if !req.IsMoved && movedTo != "" && err == lock_manager.LockNotFound {
err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
secondResp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{
Name: req.Name,
@@ -103,6 +98,15 @@ func (fs *FilerServer) FindLockOwner(ctx context.Context, req *filer_pb.FindLock
return nil, err
}
}
+
+ if owner == "" {
+ glog.V(0).Infof("find lock %s moved to %v: %v", req.Name, movedTo, err)
+ return nil, status.Error(codes.NotFound, err.Error())
+ }
+ if err != nil {
+ return nil, status.Error(codes.Internal, err.Error())
+ }
+
return &filer_pb.FindLockOwnerResponse{
Owner: owner,
}, nil