diff options
| author | chrislu <chris.lu@gmail.com> | 2023-06-25 14:14:40 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2023-06-25 14:14:40 -0700 |
| commit | 464a71a37359e4b7ce5e101431ecbbdfe199f209 (patch) | |
| tree | 3117176d156111a71a65fdf04750e08e5974efd0 /weed/server/filer_grpc_server_dlm.go | |
| parent | ee4f7cd636de97b9b0ede0c11090099e2a4e6d3b (diff) | |
| download | seaweedfs-464a71a37359e4b7ce5e101431ecbbdfe199f209.tar.xz seaweedfs-464a71a37359e4b7ce5e101431ecbbdfe199f209.zip | |
add distributed lock manager
Diffstat (limited to 'weed/server/filer_grpc_server_dlm.go')
| -rw-r--r-- | weed/server/filer_grpc_server_dlm.go | 46 |
1 files changed, 13 insertions, 33 deletions
diff --git a/weed/server/filer_grpc_server_dlm.go b/weed/server/filer_grpc_server_dlm.go index fdd299947..6618285e0 100644 --- a/weed/server/filer_grpc_server_dlm.go +++ b/weed/server/filer_grpc_server_dlm.go @@ -3,7 +3,6 @@ package weed_server import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -18,21 +17,17 @@ func (fs *FilerServer) Lock(ctx context.Context, req *filer_pb.LockRequest) (res if snapshot == nil { resp.Error = "no lock server found" return - } else { - server := lock_manager.HashKeyToServer(req.Name, snapshot) - if server != fs.option.Host { - resp.Error = fmt.Sprintf("not the lock server for %s", req.Name) - resp.MovedTo = string(server) - return - } } - renewToken, err := fs.dlm.Lock(req.Name, time.Duration(req.SecondsToLock)*time.Second, req.PreviousLockToken) + var movedTo pb.ServerAddress + expiredAtNs := time.Now().Add(time.Duration(req.SecondsToLock) * time.Second).UnixNano() + resp.RenewToken, movedTo, err = fs.dlm.Lock(fs.option.Host, req.Name, expiredAtNs, req.PreviousLockToken, snapshot) if err != nil { resp.Error = fmt.Sprintf("%v", err) - } else { - resp.RenewToken = renewToken + } + if movedTo != "" { + resp.MovedTo = string(movedTo) } return resp, nil @@ -46,16 +41,9 @@ func (fs *FilerServer) Unlock(ctx context.Context, req *filer_pb.UnlockRequest) if snapshot == nil { resp.Error = "no lock server found" return - } else { - server := lock_manager.HashKeyToServer(req.Name, snapshot) - if server != fs.option.Host { - resp.Error = fmt.Sprintf("not the lock server for %s", req.Name) - resp.MovedTo = string(server) - return - } } - _, err = fs.dlm.Unlock(req.Name, req.LockToken) + _, err = fs.dlm.Unlock(fs.option.Host, req.Name, req.LockToken, snapshot) if err != nil { resp.Error = fmt.Sprintf("%v", err) } @@ -67,13 +55,8 @@ func (fs *FilerServer) Unlock(ctx context.Context, req *filer_pb.UnlockRequest) // TransferLocks is a grpc handler to handle FilerServer's TransferLocksRequest func (fs *FilerServer) TransferLocks(ctx context.Context, req *filer_pb.TransferLocksRequest) (*filer_pb.TransferLocksResponse, error) { - now := time.Now() for _, lock := range req.Locks { - duration := time.Duration(lock.ExpirationNs - now.UnixNano()) - if _, err := fs.dlm.Lock(lock.Name, duration, lock.RenewToken); err != nil { - glog.Errorf("receive transferred lock %v to %v: %v", lock.Name, fs.option.Host, err) - return nil, err - } + fs.dlm.InsertLock(lock.Name, lock.ExpiredAtNs, lock.RenewToken) } return &filer_pb.TransferLocksResponse{}, nil @@ -81,23 +64,20 @@ func (fs *FilerServer) TransferLocks(ctx context.Context, req *filer_pb.Transfer } func (fs *FilerServer) OnDlmChangeSnapshot(snapshot []pb.ServerAddress) { - locks := fs.dlm.TakeOutLocksByKey(func(key string) bool { - server := lock_manager.HashKeyToServer(key, snapshot) - return server != fs.option.Host - }) + locks := fs.dlm.SelectNotOwnedLocks(fs.option.Host, snapshot) if len(locks) == 0 { return } for _, lock := range locks { - server := lock_manager.HashKeyToServer(lock.Key, snapshot) + server := fs.dlm.CalculateTargetServer(lock.Key, snapshot) if err := pb.WithFilerClient(false, 0, server, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { _, err := client.TransferLocks(context.Background(), &filer_pb.TransferLocksRequest{ Locks: []*filer_pb.Lock{ { - Name: lock.Key, - RenewToken: lock.Token, - ExpirationNs: lock.ExpirationNs, + Name: lock.Key, + RenewToken: lock.Token, + ExpiredAtNs: lock.ExpiredAtNs, }, }, }) |
