diff options
| author | chrislu <chris.lu@gmail.com> | 2023-09-16 15:05:38 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2023-09-16 15:05:38 -0700 |
| commit | 482742514656e9b5a652acf7406740fbc55db13d (patch) | |
| tree | 9be51ec65888a0741f63912c9bc125d0278d3360 /weed/server/filer_grpc_server_dlm.go | |
| parent | 3b50139f68d5f59961113cf8fd0b903a7294a6ca (diff) | |
| download | seaweedfs-482742514656e9b5a652acf7406740fbc55db13d.tar.xz seaweedfs-482742514656e9b5a652acf7406740fbc55db13d.zip | |
balancer works
Diffstat (limited to 'weed/server/filer_grpc_server_dlm.go')
| -rw-r--r-- | weed/server/filer_grpc_server_dlm.go | 44 |
1 files changed, 36 insertions, 8 deletions
diff --git a/weed/server/filer_grpc_server_dlm.go b/weed/server/filer_grpc_server_dlm.go index 5da8d9718..cf7014cd3 100644 --- a/weed/server/filer_grpc_server_dlm.go +++ b/weed/server/filer_grpc_server_dlm.go @@ -6,24 +6,28 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "time" ) -// Lock is a grpc handler to handle FilerServer's LockRequest -func (fs *FilerServer) Lock(ctx context.Context, req *filer_pb.LockRequest) (resp *filer_pb.LockResponse, err error) { +// DistributedLock is a grpc handler to handle FilerServer's LockRequest +func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRequest) (resp *filer_pb.LockResponse, err error) { resp = &filer_pb.LockResponse{} var movedTo pb.ServerAddress expiredAtNs := time.Now().Add(time.Duration(req.SecondsToLock) * time.Second).UnixNano() - resp.RenewToken, movedTo, err = fs.filer.Dlm.LockWithTimeout(req.Name, expiredAtNs, req.RenewToken) - if !req.IsMoved && movedTo != "" { + resp.RenewToken, movedTo, err = fs.filer.Dlm.LockWithTimeout(req.Name, expiredAtNs, req.RenewToken, req.Owner) + glog.Infof("lock %s %v %v %v, isMoved=%v %v", req.Name, req.SecondsToLock, req.RenewToken, req.Owner, req.IsMoved, movedTo) + if movedTo != "" && movedTo != fs.option.Host && !req.IsMoved { err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - secondResp, err := client.Lock(context.Background(), &filer_pb.LockRequest{ + secondResp, err := client.DistributedLock(context.Background(), &filer_pb.LockRequest{ Name: req.Name, SecondsToLock: req.SecondsToLock, RenewToken: req.RenewToken, IsMoved: true, + Owner: req.Owner, }) if err == nil { resp.RenewToken = secondResp.RenewToken @@ -45,7 +49,7 @@ func (fs *FilerServer) Lock(ctx context.Context, req *filer_pb.LockRequest) (res } // Unlock is a grpc handler to handle FilerServer's UnlockRequest -func (fs *FilerServer) Unlock(ctx context.Context, req *filer_pb.UnlockRequest) (resp *filer_pb.UnlockResponse, err error) { +func (fs *FilerServer) DistributedUnlock(ctx context.Context, req *filer_pb.UnlockRequest) (resp *filer_pb.UnlockResponse, err error) { resp = &filer_pb.UnlockResponse{} @@ -54,7 +58,7 @@ func (fs *FilerServer) Unlock(ctx context.Context, req *filer_pb.UnlockRequest) if !req.IsMoved && movedTo != "" { err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - secondResp, err := client.Unlock(context.Background(), &filer_pb.UnlockRequest{ + secondResp, err := client.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{ Name: req.Name, RenewToken: req.RenewToken, IsMoved: true, @@ -75,11 +79,34 @@ func (fs *FilerServer) Unlock(ctx context.Context, req *filer_pb.UnlockRequest) } +func (fs *FilerServer) FindLockOwner(ctx context.Context, req *filer_pb.FindLockOwnerRequest) (*filer_pb.FindLockOwnerResponse, error) { + owner, movedTo, err := fs.filer.Dlm.FindLockOwner(req.Name) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + if !req.IsMoved && movedTo != "" { + err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + secondResp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{ + Name: req.Name, + IsMoved: true, + }) + owner = secondResp.Owner + return err + }) + if err != nil { + return nil, err + } + } + return &filer_pb.FindLockOwnerResponse{ + Owner: owner, + }, nil +} + // TransferLocks is a grpc handler to handle FilerServer's TransferLocksRequest func (fs *FilerServer) TransferLocks(ctx context.Context, req *filer_pb.TransferLocksRequest) (*filer_pb.TransferLocksResponse, error) { for _, lock := range req.Locks { - fs.filer.Dlm.InsertLock(lock.Name, lock.ExpiredAtNs, lock.RenewToken) + fs.filer.Dlm.InsertLock(lock.Name, lock.ExpiredAtNs, lock.RenewToken, lock.Owner) } return &filer_pb.TransferLocksResponse{}, nil @@ -101,6 +128,7 @@ func (fs *FilerServer) OnDlmChangeSnapshot(snapshot []pb.ServerAddress) { Name: lock.Key, RenewToken: lock.Token, ExpiredAtNs: lock.ExpiredAtNs, + Owner: lock.Owner, }, }, }) |
