diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server_dlm.go | 29 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 4 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_autochunk.go | 4 |
3 files changed, 29 insertions, 8 deletions
diff --git a/weed/server/filer_grpc_server_dlm.go b/weed/server/filer_grpc_server_dlm.go index 189e6820e..7e8f93102 100644 --- a/weed/server/filer_grpc_server_dlm.go +++ b/weed/server/filer_grpc_server_dlm.go @@ -16,15 +16,21 @@ import ( // DistributedLock is a grpc handler to handle FilerServer's LockRequest func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRequest) (resp *filer_pb.LockResponse, err error) { + glog.V(4).Infof("FILER LOCK: Received DistributedLock request - name=%s owner=%s renewToken=%s secondsToLock=%d isMoved=%v", + req.Name, req.Owner, req.RenewToken, req.SecondsToLock, req.IsMoved) + resp = &filer_pb.LockResponse{} var movedTo pb.ServerAddress expiredAtNs := time.Now().Add(time.Duration(req.SecondsToLock) * time.Second).UnixNano() resp.LockOwner, resp.RenewToken, movedTo, err = fs.filer.Dlm.LockWithTimeout(req.Name, expiredAtNs, req.RenewToken, req.Owner) + glog.V(4).Infof("FILER LOCK: LockWithTimeout result - name=%s lockOwner=%s renewToken=%s movedTo=%s err=%v", + req.Name, resp.LockOwner, resp.RenewToken, movedTo, err) glog.V(4).Infof("lock %s %v %v %v, isMoved=%v %v", req.Name, req.SecondsToLock, req.RenewToken, req.Owner, req.IsMoved, movedTo) if movedTo != "" && movedTo != fs.option.Host && !req.IsMoved { + glog.V(0).Infof("FILER LOCK: Forwarding to correct filer - from=%s to=%s", fs.option.Host, movedTo) err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - secondResp, err := client.DistributedLock(context.Background(), &filer_pb.LockRequest{ + secondResp, err := client.DistributedLock(ctx, &filer_pb.LockRequest{ Name: req.Name, SecondsToLock: req.SecondsToLock, RenewToken: req.RenewToken, @@ -35,6 +41,9 @@ func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRe resp.RenewToken = secondResp.RenewToken resp.LockOwner = secondResp.LockOwner resp.Error = secondResp.Error + glog.V(0).Infof("FILER LOCK: Forwarded lock acquired - name=%s renewToken=%s", req.Name, resp.RenewToken) + } else { + glog.V(0).Infof("FILER LOCK: Forward failed - name=%s err=%v", req.Name, err) } return err }) @@ -42,11 +51,15 @@ func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRe if err != nil { resp.Error = fmt.Sprintf("%v", err) + glog.V(0).Infof("FILER LOCK: Error - name=%s error=%s", req.Name, resp.Error) } if movedTo != "" { resp.LockHostMovedTo = string(movedTo) } + glog.V(4).Infof("FILER LOCK: Returning response - name=%s renewToken=%s lockOwner=%s error=%s movedTo=%s", + req.Name, resp.RenewToken, resp.LockOwner, resp.Error, resp.LockHostMovedTo) + return resp, nil } @@ -60,7 +73,7 @@ func (fs *FilerServer) DistributedUnlock(ctx context.Context, req *filer_pb.Unlo if !req.IsMoved && movedTo != "" { err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - secondResp, err := client.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{ + secondResp, err := client.DistributedUnlock(ctx, &filer_pb.UnlockRequest{ Name: req.Name, RenewToken: req.RenewToken, IsMoved: true, @@ -85,7 +98,7 @@ func (fs *FilerServer) FindLockOwner(ctx context.Context, req *filer_pb.FindLock owner, movedTo, err := fs.filer.Dlm.FindLockOwner(req.Name) if !req.IsMoved && movedTo != "" || err == lock_manager.LockNotFound { err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - secondResp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{ + secondResp, err := client.FindLockOwner(ctx, &filer_pb.FindLockOwnerRequest{ Name: req.Name, IsMoved: true, }) @@ -132,8 +145,10 @@ func (fs *FilerServer) OnDlmChangeSnapshot(snapshot []pb.ServerAddress) { for _, lock := range locks { server := fs.filer.Dlm.CalculateTargetServer(lock.Key, snapshot) - if err := pb.WithFilerClient(false, 0, server, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - _, err := client.TransferLocks(context.Background(), &filer_pb.TransferLocksRequest{ + // Use a context with timeout for lock transfer to avoid hanging indefinitely + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + err := pb.WithFilerClient(false, 0, server, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + _, err := client.TransferLocks(ctx, &filer_pb.TransferLocksRequest{ Locks: []*filer_pb.Lock{ { Name: lock.Key, @@ -144,7 +159,9 @@ func (fs *FilerServer) OnDlmChangeSnapshot(snapshot []pb.ServerAddress) { }, }) return err - }); err != nil { + }) + cancel() + if err != nil { // it may not be worth retrying, since the lock may have expired glog.Errorf("transfer lock %v to %v: %v", lock.Key, server, err) } diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index a0a192a10..f4df550e6 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -69,7 +69,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, if processedTsNs != 0 { lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2) } else { - nextDayTs := util.GetNextDayTsNano(lastReadTime.UnixNano()) + nextDayTs := util.GetNextDayTsNano(lastReadTime.Time.UnixNano()) position := log_buffer.NewMessagePosition(nextDayTs, -2) found, err := fs.filer.HasPersistedLogFiles(position) if err != nil { @@ -171,7 +171,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq continue } // If no persisted entries were read for this day, check the next day for logs - nextDayTs := util.GetNextDayTsNano(lastReadTime.UnixNano()) + nextDayTs := util.GetNextDayTsNano(lastReadTime.Time.UnixNano()) position := log_buffer.NewMessagePosition(nextDayTs, -2) found, err := fs.filer.HasPersistedLogFiles(position) if err != nil { diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index a535ff16c..ca36abcac 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -335,6 +335,10 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa if len(v) > 0 && len(v[0]) > 0 { if strings.HasPrefix(k, needle.PairNamePrefix) || k == "Cache-Control" || k == "Expires" || k == "Content-Disposition" { entry.Extended[k] = []byte(v[0]) + // Log version ID header specifically for debugging + if k == "Seaweed-X-Amz-Version-Id" { + glog.V(0).Infof("filer: storing version ID header in Extended: %s=%s for path=%s", k, v[0], path) + } } if k == "Response-Content-Disposition" { entry.Extended["Content-Disposition"] = []byte(v[0]) |
