diff options
| author | chrislu <chris.lu@gmail.com> | 2024-02-01 23:01:44 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-02-01 23:01:44 -0800 |
| commit | d30150dde18c21c3c3af97cd935da27e2213f8cf (patch) | |
| tree | 2f44a8ee73a5f267b674dd80a9acac43c91eba5b | |
| parent | 0aed16a9c4f981190f8b23f0519be4c2d83ace8b (diff) | |
| download | seaweedfs-d30150dde18c21c3c3af97cd935da27e2213f8cf.tar.xz seaweedfs-d30150dde18c21c3c3af97cd935da27e2213f8cf.zip | |
use a short-lived lock
| -rw-r--r-- | weed/cluster/lock_client.go | 49 | ||||
| -rw-r--r-- | weed/mq/broker/broker_server.go | 4 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server.go | 4 |
3 files changed, 22 insertions, 35 deletions
diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go index 3d8dae8cb..5c17e1918 100644 --- a/weed/cluster/lock_client.go +++ b/weed/cluster/lock_client.go @@ -40,57 +40,42 @@ type LiveLock struct { lc *LockClient } -// NewLock creates a lock with a very long duration -func (lc *LockClient) NewLock(key string, owner string) (lock *LiveLock) { - return lc.doNewLock(key, lock_manager.MaxDuration, owner) -} - -// StartLock starts a goroutine to lock the key and returns immediately. -func (lc *LockClient) StartLock(key string, owner string) (lock *LiveLock) { +// NewShortLivedLock creates a lock with a 5-second duration +func (lc *LockClient) NewShortLivedLock(key string, owner string) (lock *LiveLock) { lock = &LiveLock{ key: key, filer: lc.seedFiler, cancelCh: make(chan struct{}), - expireAtNs: time.Now().Add(lock_manager.MaxDuration).UnixNano(), + expireAtNs: time.Now().Add(5*time.Second).UnixNano(), grpcDialOption: lc.grpcDialOption, owner: owner, lc: lc, } - go func() { - lock.CreateLock(lock_manager.MaxDuration) - lc.keepLock(lock) - }() + lock.retryUntilLocked(5*time.Second) return } -func (lc *LockClient) doNewLock(key string, lockDuration time.Duration, owner string) (lock *LiveLock) { +// StartLock starts a goroutine to lock the key and returns immediately. +func (lc *LockClient) StartLock(key string, owner string) (lock *LiveLock) { lock = &LiveLock{ key: key, filer: lc.seedFiler, cancelCh: make(chan struct{}), - expireAtNs: time.Now().Add(lockDuration).UnixNano(), + expireAtNs: time.Now().Add(lock_manager.MaxDuration).UnixNano(), grpcDialOption: lc.grpcDialOption, owner: owner, lc: lc, } - var needRenewal bool - if lockDuration > lc.maxLockDuration { - lockDuration = lc.maxLockDuration - needRenewal = true - } - - lock.CreateLock(lockDuration) - - if needRenewal { - go lc.keepLock(lock) - } - + go func() { + lock.retryUntilLocked(lock_manager.MaxDuration) + lc.keepLock(lock) + }() return } -func (lock *LiveLock) CreateLock(lockDuration time.Duration) { +func (lock *LiveLock) retryUntilLocked(lockDuration time.Duration) { util.RetryUntil("create lock:"+lock.key, func() error { - return lock.DoLock(lockDuration) + return lock.AttemptToLock(lockDuration) }, func(err error) (shouldContinue bool) { if err != nil { glog.Warningf("create lock %s: %s", lock.key, err) @@ -99,7 +84,7 @@ func (lock *LiveLock) CreateLock(lockDuration time.Duration) { }) } -func (lock *LiveLock) DoLock(lockDuration time.Duration) error { +func (lock *LiveLock) AttemptToLock(lockDuration time.Duration) error { errorMessage, err := lock.doLock(lockDuration) if err != nil { time.Sleep(time.Second) @@ -117,11 +102,13 @@ func (lock *LiveLock) IsLocked() bool { return lock!=nil && lock.isLocked } -func (lock *LiveLock) StopLock() error { - close(lock.cancelCh) +func (lock *LiveLock) StopShortLivedLock() error { if !lock.isLocked { return nil } + defer func() { + lock.isLocked = false + }() return pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { _, err := client.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{ Name: lock.key, diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index a009af693..3a25a9691 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -92,8 +92,8 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial glog.V(0).Infof("BrokerConnectToBalancer: %v", err) } time.Sleep(time.Second) - if err := mqBroker.lockAsBalancer.DoLock(lock_manager.MaxDuration); err != nil { - glog.V(0).Infof("DoLock: %v", err) + if err := mqBroker.lockAsBalancer.AttemptToLock(lock_manager.MaxDuration); err != nil { + glog.V(0).Infof("AttemptToLock: %v", err) } } }() diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index f32273f26..eeb031cd1 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -245,8 +245,8 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo fullpath := util.NewFullPath(req.Directory, req.EntryName) lockClient := cluster.NewLockClient(fs.grpcDialOption, fs.option.Host) - lock := lockClient.NewLock(string(fullpath), string(fs.option.Host)) - defer lock.StopLock() + lock := lockClient.NewShortLivedLock(string(fullpath), string(fs.option.Host)) + defer lock.StopShortLivedLock() var offset int64 = 0 entry, err := fs.filer.FindEntry(ctx, fullpath) |
