diff options
Diffstat (limited to 'weed/server/filer_grpc_server_dlm.go')
| -rw-r--r-- | weed/server/filer_grpc_server_dlm.go | 111 |
1 files changed, 111 insertions, 0 deletions
diff --git a/weed/server/filer_grpc_server_dlm.go b/weed/server/filer_grpc_server_dlm.go new file mode 100644 index 000000000..fdd299947 --- /dev/null +++ b/weed/server/filer_grpc_server_dlm.go @@ -0,0 +1,111 @@ +package weed_server + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "time" +) + +// Lock is a grpc handler to handle FilerServer's LockRequest +func (fs *FilerServer) Lock(ctx context.Context, req *filer_pb.LockRequest) (resp *filer_pb.LockResponse, err error) { + + resp = &filer_pb.LockResponse{} + snapshot := fs.filer.LockRing.GetSnapshot() + if snapshot == nil { + resp.Error = "no lock server found" + return + } else { + server := lock_manager.HashKeyToServer(req.Name, snapshot) + if server != fs.option.Host { + resp.Error = fmt.Sprintf("not the lock server for %s", req.Name) + resp.MovedTo = string(server) + return + } + } + + renewToken, err := fs.dlm.Lock(req.Name, time.Duration(req.SecondsToLock)*time.Second, req.PreviousLockToken) + + if err != nil { + resp.Error = fmt.Sprintf("%v", err) + } else { + resp.RenewToken = renewToken + } + + return resp, nil +} + +// Unlock is a grpc handler to handle FilerServer's UnlockRequest +func (fs *FilerServer) Unlock(ctx context.Context, req *filer_pb.UnlockRequest) (resp *filer_pb.UnlockResponse, err error) { + + resp = &filer_pb.UnlockResponse{} + snapshot := fs.filer.LockRing.GetSnapshot() + if snapshot == nil { + resp.Error = "no lock server found" + return + } else { + server := lock_manager.HashKeyToServer(req.Name, snapshot) + if server != fs.option.Host { + resp.Error = fmt.Sprintf("not the lock server for %s", req.Name) + resp.MovedTo = string(server) + return + } + } + + _, err = fs.dlm.Unlock(req.Name, req.LockToken) + if err != nil { + resp.Error = fmt.Sprintf("%v", err) + } + + return resp, nil + +} + +// TransferLocks is a grpc handler to handle FilerServer's TransferLocksRequest +func (fs *FilerServer) TransferLocks(ctx context.Context, req *filer_pb.TransferLocksRequest) (*filer_pb.TransferLocksResponse, error) { + + now := time.Now() + for _, lock := range req.Locks { + duration := time.Duration(lock.ExpirationNs - now.UnixNano()) + if _, err := fs.dlm.Lock(lock.Name, duration, lock.RenewToken); err != nil { + glog.Errorf("receive transferred lock %v to %v: %v", lock.Name, fs.option.Host, err) + return nil, err + } + } + + return &filer_pb.TransferLocksResponse{}, nil + +} + +func (fs *FilerServer) OnDlmChangeSnapshot(snapshot []pb.ServerAddress) { + locks := fs.dlm.TakeOutLocksByKey(func(key string) bool { + server := lock_manager.HashKeyToServer(key, snapshot) + return server != fs.option.Host + }) + if len(locks) == 0 { + return + } + + for _, lock := range locks { + server := lock_manager.HashKeyToServer(lock.Key, snapshot) + if err := pb.WithFilerClient(false, 0, server, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + _, err := client.TransferLocks(context.Background(), &filer_pb.TransferLocksRequest{ + Locks: []*filer_pb.Lock{ + { + Name: lock.Key, + RenewToken: lock.Token, + ExpirationNs: lock.ExpirationNs, + }, + }, + }) + return err + }); err != nil { + // it may not be worth retrying, since the lock may have expired + glog.Errorf("transfer lock %v to %v: %v", lock.Key, server, err) + } + } + +} |
