aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_grpc_server_dlm.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/filer_grpc_server_dlm.go')
-rw-r--r--weed/server/filer_grpc_server_dlm.go111
1 files changed, 111 insertions, 0 deletions
diff --git a/weed/server/filer_grpc_server_dlm.go b/weed/server/filer_grpc_server_dlm.go
new file mode 100644
index 000000000..fdd299947
--- /dev/null
+++ b/weed/server/filer_grpc_server_dlm.go
@@ -0,0 +1,111 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "time"
+)
+
+// Lock is a grpc handler to handle FilerServer's LockRequest
+func (fs *FilerServer) Lock(ctx context.Context, req *filer_pb.LockRequest) (resp *filer_pb.LockResponse, err error) {
+
+ resp = &filer_pb.LockResponse{}
+ snapshot := fs.filer.LockRing.GetSnapshot()
+ if snapshot == nil {
+ resp.Error = "no lock server found"
+ return
+ } else {
+ server := lock_manager.HashKeyToServer(req.Name, snapshot)
+ if server != fs.option.Host {
+ resp.Error = fmt.Sprintf("not the lock server for %s", req.Name)
+ resp.MovedTo = string(server)
+ return
+ }
+ }
+
+ renewToken, err := fs.dlm.Lock(req.Name, time.Duration(req.SecondsToLock)*time.Second, req.PreviousLockToken)
+
+ if err != nil {
+ resp.Error = fmt.Sprintf("%v", err)
+ } else {
+ resp.RenewToken = renewToken
+ }
+
+ return resp, nil
+}
+
+// Unlock is a grpc handler to handle FilerServer's UnlockRequest
+func (fs *FilerServer) Unlock(ctx context.Context, req *filer_pb.UnlockRequest) (resp *filer_pb.UnlockResponse, err error) {
+
+ resp = &filer_pb.UnlockResponse{}
+ snapshot := fs.filer.LockRing.GetSnapshot()
+ if snapshot == nil {
+ resp.Error = "no lock server found"
+ return
+ } else {
+ server := lock_manager.HashKeyToServer(req.Name, snapshot)
+ if server != fs.option.Host {
+ resp.Error = fmt.Sprintf("not the lock server for %s", req.Name)
+ resp.MovedTo = string(server)
+ return
+ }
+ }
+
+ _, err = fs.dlm.Unlock(req.Name, req.LockToken)
+ if err != nil {
+ resp.Error = fmt.Sprintf("%v", err)
+ }
+
+ return resp, nil
+
+}
+
+// TransferLocks is a grpc handler to handle FilerServer's TransferLocksRequest
+func (fs *FilerServer) TransferLocks(ctx context.Context, req *filer_pb.TransferLocksRequest) (*filer_pb.TransferLocksResponse, error) {
+
+ now := time.Now()
+ for _, lock := range req.Locks {
+ duration := time.Duration(lock.ExpirationNs - now.UnixNano())
+ if _, err := fs.dlm.Lock(lock.Name, duration, lock.RenewToken); err != nil {
+ glog.Errorf("receive transferred lock %v to %v: %v", lock.Name, fs.option.Host, err)
+ return nil, err
+ }
+ }
+
+ return &filer_pb.TransferLocksResponse{}, nil
+
+}
+
+func (fs *FilerServer) OnDlmChangeSnapshot(snapshot []pb.ServerAddress) {
+ locks := fs.dlm.TakeOutLocksByKey(func(key string) bool {
+ server := lock_manager.HashKeyToServer(key, snapshot)
+ return server != fs.option.Host
+ })
+ if len(locks) == 0 {
+ return
+ }
+
+ for _, lock := range locks {
+ server := lock_manager.HashKeyToServer(lock.Key, snapshot)
+ if err := pb.WithFilerClient(false, 0, server, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ _, err := client.TransferLocks(context.Background(), &filer_pb.TransferLocksRequest{
+ Locks: []*filer_pb.Lock{
+ {
+ Name: lock.Key,
+ RenewToken: lock.Token,
+ ExpirationNs: lock.ExpirationNs,
+ },
+ },
+ })
+ return err
+ }); err != nil {
+ // it may not be worth retrying, since the lock may have expired
+ glog.Errorf("transfer lock %v to %v: %v", lock.Key, server, err)
+ }
+ }
+
+}