diff options
Diffstat (limited to 'weed/server/filer_grpc_server_dlm.go')
| -rw-r--r-- | weed/server/filer_grpc_server_dlm.go | 29 |
1 files changed, 23 insertions, 6 deletions
diff --git a/weed/server/filer_grpc_server_dlm.go b/weed/server/filer_grpc_server_dlm.go index 189e6820e..7e8f93102 100644 --- a/weed/server/filer_grpc_server_dlm.go +++ b/weed/server/filer_grpc_server_dlm.go @@ -16,15 +16,21 @@ import ( // DistributedLock is a grpc handler to handle FilerServer's LockRequest func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRequest) (resp *filer_pb.LockResponse, err error) { + glog.V(4).Infof("FILER LOCK: Received DistributedLock request - name=%s owner=%s renewToken=%s secondsToLock=%d isMoved=%v", + req.Name, req.Owner, req.RenewToken, req.SecondsToLock, req.IsMoved) + resp = &filer_pb.LockResponse{} var movedTo pb.ServerAddress expiredAtNs := time.Now().Add(time.Duration(req.SecondsToLock) * time.Second).UnixNano() resp.LockOwner, resp.RenewToken, movedTo, err = fs.filer.Dlm.LockWithTimeout(req.Name, expiredAtNs, req.RenewToken, req.Owner) + glog.V(4).Infof("FILER LOCK: LockWithTimeout result - name=%s lockOwner=%s renewToken=%s movedTo=%s err=%v", + req.Name, resp.LockOwner, resp.RenewToken, movedTo, err) glog.V(4).Infof("lock %s %v %v %v, isMoved=%v %v", req.Name, req.SecondsToLock, req.RenewToken, req.Owner, req.IsMoved, movedTo) if movedTo != "" && movedTo != fs.option.Host && !req.IsMoved { + glog.V(0).Infof("FILER LOCK: Forwarding to correct filer - from=%s to=%s", fs.option.Host, movedTo) err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - secondResp, err := client.DistributedLock(context.Background(), &filer_pb.LockRequest{ + secondResp, err := client.DistributedLock(ctx, &filer_pb.LockRequest{ Name: req.Name, SecondsToLock: req.SecondsToLock, RenewToken: req.RenewToken, @@ -35,6 +41,9 @@ func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRe resp.RenewToken = secondResp.RenewToken resp.LockOwner = secondResp.LockOwner resp.Error = secondResp.Error + glog.V(0).Infof("FILER LOCK: Forwarded lock acquired - name=%s renewToken=%s", req.Name, resp.RenewToken) + } else { + glog.V(0).Infof("FILER LOCK: Forward failed - name=%s err=%v", req.Name, err) } return err }) @@ -42,11 +51,15 @@ func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRe if err != nil { resp.Error = fmt.Sprintf("%v", err) + glog.V(0).Infof("FILER LOCK: Error - name=%s error=%s", req.Name, resp.Error) } if movedTo != "" { resp.LockHostMovedTo = string(movedTo) } + glog.V(4).Infof("FILER LOCK: Returning response - name=%s renewToken=%s lockOwner=%s error=%s movedTo=%s", + req.Name, resp.RenewToken, resp.LockOwner, resp.Error, resp.LockHostMovedTo) + return resp, nil } @@ -60,7 +73,7 @@ func (fs *FilerServer) DistributedUnlock(ctx context.Context, req *filer_pb.Unlo if !req.IsMoved && movedTo != "" { err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - secondResp, err := client.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{ + secondResp, err := client.DistributedUnlock(ctx, &filer_pb.UnlockRequest{ Name: req.Name, RenewToken: req.RenewToken, IsMoved: true, @@ -85,7 +98,7 @@ func (fs *FilerServer) FindLockOwner(ctx context.Context, req *filer_pb.FindLock owner, movedTo, err := fs.filer.Dlm.FindLockOwner(req.Name) if !req.IsMoved && movedTo != "" || err == lock_manager.LockNotFound { err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - secondResp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{ + secondResp, err := client.FindLockOwner(ctx, &filer_pb.FindLockOwnerRequest{ Name: req.Name, IsMoved: true, }) @@ -132,8 +145,10 @@ func (fs *FilerServer) OnDlmChangeSnapshot(snapshot []pb.ServerAddress) { for _, lock := range locks { server := fs.filer.Dlm.CalculateTargetServer(lock.Key, snapshot) - if err := pb.WithFilerClient(false, 0, server, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - _, err := client.TransferLocks(context.Background(), &filer_pb.TransferLocksRequest{ + // Use a context with timeout for lock transfer to avoid hanging indefinitely + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + err := pb.WithFilerClient(false, 0, server, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + _, err := client.TransferLocks(ctx, &filer_pb.TransferLocksRequest{ Locks: []*filer_pb.Lock{ { Name: lock.Key, @@ -144,7 +159,9 @@ func (fs *FilerServer) OnDlmChangeSnapshot(snapshot []pb.ServerAddress) { }, }) return err - }); err != nil { + }) + cancel() + if err != nil { // it may not be worth retrying, since the lock may have expired glog.Errorf("transfer lock %v to %v: %v", lock.Key, server, err) } |
