aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_grpc_server_dlm.go29
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go4
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go4
3 files changed, 29 insertions, 8 deletions
diff --git a/weed/server/filer_grpc_server_dlm.go b/weed/server/filer_grpc_server_dlm.go
index 189e6820e..7e8f93102 100644
--- a/weed/server/filer_grpc_server_dlm.go
+++ b/weed/server/filer_grpc_server_dlm.go
@@ -16,15 +16,21 @@ import (
// DistributedLock is a grpc handler to handle FilerServer's LockRequest
func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRequest) (resp *filer_pb.LockResponse, err error) {
+ glog.V(4).Infof("FILER LOCK: Received DistributedLock request - name=%s owner=%s renewToken=%s secondsToLock=%d isMoved=%v",
+ req.Name, req.Owner, req.RenewToken, req.SecondsToLock, req.IsMoved)
+
resp = &filer_pb.LockResponse{}
var movedTo pb.ServerAddress
expiredAtNs := time.Now().Add(time.Duration(req.SecondsToLock) * time.Second).UnixNano()
resp.LockOwner, resp.RenewToken, movedTo, err = fs.filer.Dlm.LockWithTimeout(req.Name, expiredAtNs, req.RenewToken, req.Owner)
+ glog.V(4).Infof("FILER LOCK: LockWithTimeout result - name=%s lockOwner=%s renewToken=%s movedTo=%s err=%v",
+ req.Name, resp.LockOwner, resp.RenewToken, movedTo, err)
glog.V(4).Infof("lock %s %v %v %v, isMoved=%v %v", req.Name, req.SecondsToLock, req.RenewToken, req.Owner, req.IsMoved, movedTo)
if movedTo != "" && movedTo != fs.option.Host && !req.IsMoved {
+ glog.V(0).Infof("FILER LOCK: Forwarding to correct filer - from=%s to=%s", fs.option.Host, movedTo)
err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- secondResp, err := client.DistributedLock(context.Background(), &filer_pb.LockRequest{
+ secondResp, err := client.DistributedLock(ctx, &filer_pb.LockRequest{
Name: req.Name,
SecondsToLock: req.SecondsToLock,
RenewToken: req.RenewToken,
@@ -35,6 +41,9 @@ func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRe
resp.RenewToken = secondResp.RenewToken
resp.LockOwner = secondResp.LockOwner
resp.Error = secondResp.Error
+ glog.V(0).Infof("FILER LOCK: Forwarded lock acquired - name=%s renewToken=%s", req.Name, resp.RenewToken)
+ } else {
+ glog.V(0).Infof("FILER LOCK: Forward failed - name=%s err=%v", req.Name, err)
}
return err
})
@@ -42,11 +51,15 @@ func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRe
if err != nil {
resp.Error = fmt.Sprintf("%v", err)
+ glog.V(0).Infof("FILER LOCK: Error - name=%s error=%s", req.Name, resp.Error)
}
if movedTo != "" {
resp.LockHostMovedTo = string(movedTo)
}
+ glog.V(4).Infof("FILER LOCK: Returning response - name=%s renewToken=%s lockOwner=%s error=%s movedTo=%s",
+ req.Name, resp.RenewToken, resp.LockOwner, resp.Error, resp.LockHostMovedTo)
+
return resp, nil
}
@@ -60,7 +73,7 @@ func (fs *FilerServer) DistributedUnlock(ctx context.Context, req *filer_pb.Unlo
if !req.IsMoved && movedTo != "" {
err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- secondResp, err := client.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{
+ secondResp, err := client.DistributedUnlock(ctx, &filer_pb.UnlockRequest{
Name: req.Name,
RenewToken: req.RenewToken,
IsMoved: true,
@@ -85,7 +98,7 @@ func (fs *FilerServer) FindLockOwner(ctx context.Context, req *filer_pb.FindLock
owner, movedTo, err := fs.filer.Dlm.FindLockOwner(req.Name)
if !req.IsMoved && movedTo != "" || err == lock_manager.LockNotFound {
err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- secondResp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{
+ secondResp, err := client.FindLockOwner(ctx, &filer_pb.FindLockOwnerRequest{
Name: req.Name,
IsMoved: true,
})
@@ -132,8 +145,10 @@ func (fs *FilerServer) OnDlmChangeSnapshot(snapshot []pb.ServerAddress) {
for _, lock := range locks {
server := fs.filer.Dlm.CalculateTargetServer(lock.Key, snapshot)
- if err := pb.WithFilerClient(false, 0, server, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- _, err := client.TransferLocks(context.Background(), &filer_pb.TransferLocksRequest{
+ // Use a context with timeout for lock transfer to avoid hanging indefinitely
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ err := pb.WithFilerClient(false, 0, server, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ _, err := client.TransferLocks(ctx, &filer_pb.TransferLocksRequest{
Locks: []*filer_pb.Lock{
{
Name: lock.Key,
@@ -144,7 +159,9 @@ func (fs *FilerServer) OnDlmChangeSnapshot(snapshot []pb.ServerAddress) {
},
})
return err
- }); err != nil {
+ })
+ cancel()
+ if err != nil {
// it may not be worth retrying, since the lock may have expired
glog.Errorf("transfer lock %v to %v: %v", lock.Key, server, err)
}
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index a0a192a10..f4df550e6 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -69,7 +69,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
if processedTsNs != 0 {
lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2)
} else {
- nextDayTs := util.GetNextDayTsNano(lastReadTime.UnixNano())
+ nextDayTs := util.GetNextDayTsNano(lastReadTime.Time.UnixNano())
position := log_buffer.NewMessagePosition(nextDayTs, -2)
found, err := fs.filer.HasPersistedLogFiles(position)
if err != nil {
@@ -171,7 +171,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
continue
}
// If no persisted entries were read for this day, check the next day for logs
- nextDayTs := util.GetNextDayTsNano(lastReadTime.UnixNano())
+ nextDayTs := util.GetNextDayTsNano(lastReadTime.Time.UnixNano())
position := log_buffer.NewMessagePosition(nextDayTs, -2)
found, err := fs.filer.HasPersistedLogFiles(position)
if err != nil {
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index a535ff16c..ca36abcac 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -335,6 +335,10 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
if len(v) > 0 && len(v[0]) > 0 {
if strings.HasPrefix(k, needle.PairNamePrefix) || k == "Cache-Control" || k == "Expires" || k == "Content-Disposition" {
entry.Extended[k] = []byte(v[0])
+ // Log version ID header specifically for debugging
+ if k == "Seaweed-X-Amz-Version-Id" {
+ glog.V(0).Infof("filer: storing version ID header in Extended: %s=%s for path=%s", k, v[0], path)
+ }
}
if k == "Response-Content-Disposition" {
entry.Extended["Content-Disposition"] = []byte(v[0])