aboutsummaryrefslogtreecommitdiff
path: root/pkg/driver/nodeserver.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/driver/nodeserver.go')
-rw-r--r--pkg/driver/nodeserver.go191
1 files changed, 133 insertions, 58 deletions
diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go
index d48875f..5a7206e 100644
--- a/pkg/driver/nodeserver.go
+++ b/pkg/driver/nodeserver.go
@@ -4,13 +4,11 @@ import (
"context"
"os"
"strings"
+ "sync"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/mount_pb"
"github.com/container-storage-interface/spec/lib/go/csi"
- "google.golang.org/grpc"
"google.golang.org/grpc/codes"
- "google.golang.org/grpc/credentials/insecure"
_ "google.golang.org/grpc/resolver/passthrough"
"google.golang.org/grpc/status"
"k8s.io/utils/mount"
@@ -19,16 +17,19 @@ import (
type NodeServer struct {
Driver *SeaweedFsDriver
mounter mount.Interface
+
+ // information about the managed volumes
+ volumes sync.Map
+ volumeMutexes *KeyMutex
}
var _ = csi.NodeServer(&NodeServer{})
-func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
+func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
volumeID := req.GetVolumeId()
// mount the fs here
- targetPath := req.GetTargetPath()
-
- glog.V(0).Infof("NodePublishVolume volume %s to %s", volumeID, targetPath)
+ stagingTargetPath := req.GetStagingTargetPath()
+ glog.V(0).Infof("node stage volume %s to %s", volumeID, stagingTargetPath)
// Check arguments
if req.GetVolumeCapability() == nil {
@@ -40,25 +41,30 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
- if targetPath == "" {
+ if stagingTargetPath == "" {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
- // check whether it can be mounted
- notMnt, err := checkMount(targetPath)
- if err != nil {
- return nil, status.Error(codes.Internal, err.Error())
- }
- if !notMnt {
- return &csi.NodePublishVolumeResponse{}, nil
+ volumeMutex := ns.getVolumeMutex(volumeID)
+ volumeMutex.Lock()
+ defer volumeMutex.Unlock()
+
+ // The volume has been staged.
+ if _, ok := ns.volumes.Load(volumeID); ok {
+ glog.V(0).Infof("volume %s has been staged", volumeID)
+ return &csi.NodeStageVolumeResponse{}, nil
}
volContext := req.GetVolumeContext()
- mounter, err := newMounter(volumeID, req.GetReadonly(), ns.Driver, volContext)
+ readOnly := isVolumeReadOnly(req)
+
+ mounter, err := newMounter(volumeID, readOnly, ns.Driver, volContext)
if err != nil {
return nil, err
}
- if err := mounter.Mount(targetPath); err != nil {
+
+ volume := NewVolume(volumeID, mounter)
+ if err := volume.Stage(stagingTargetPath); err != nil {
if os.IsPermission(err) {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
@@ -68,25 +74,75 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return nil, status.Error(codes.Internal, err.Error())
}
- glog.V(0).Infof("volume %s successfully mounted to %s", volumeID, targetPath)
+ ns.volumes.Store(volumeID, volume)
+ glog.V(0).Infof("volume %s successfully staged to %s", volumeID, stagingTargetPath)
+
+ return &csi.NodeStageVolumeResponse{}, nil
+}
+
+func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
+ volumeID := req.GetVolumeId()
+ targetPath := req.GetTargetPath()
+
+ glog.V(0).Infof("node publish volume %s to %s", volumeID, targetPath)
+
+ // Check arguments
+ if req.GetVolumeCapability() == nil {
+ return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request")
+ }
+ if !isValidVolumeCapabilities(ns.Driver.vcap, []*csi.VolumeCapability{req.GetVolumeCapability()}) {
+ // return nil, status.Error(codes.InvalidArgument, "Volume capability not supported")
+ }
+ if volumeID == "" {
+ return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
+ }
+ if targetPath == "" {
+ return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
+ }
+
+ volumeMutex := ns.getVolumeMutex(volumeID)
+ volumeMutex.Lock()
+ defer volumeMutex.Unlock()
+
+ volume, ok := ns.volumes.Load(volumeID)
+ if !ok {
+ return nil, status.Error(codes.FailedPrecondition, "volume hasn't been staged yet")
+ }
+
+ // When pod uses a volume in read-only mode, k8s will automatically
+ // mount the volume as a read-only file system.
+ if err := volume.(*Volume).Publish(targetPath); err != nil {
+ return nil, status.Error(codes.Internal, err.Error())
+ }
+ glog.V(0).Infof("volume %s successfully published to %s", volumeID, targetPath)
return &csi.NodePublishVolumeResponse{}, nil
}
func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
-
+ volumeID := req.GetVolumeId()
targetPath := req.GetTargetPath()
+ glog.V(0).Infof("node unpublish volume %s from %s", volumeID, targetPath)
+
+ if volumeID == "" {
+ return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
+ }
if targetPath == "" {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
- if err := fuseUnmount(targetPath); err != nil {
- return nil, status.Error(codes.Internal, err.Error())
+ volumeMutex := ns.getVolumeMutex(volumeID)
+ volumeMutex.Lock()
+ defer volumeMutex.Unlock()
+
+ volume, ok := ns.volumes.Load(volumeID)
+ if !ok {
+ glog.Warningf("volume %s hasn't been published", volumeID)
+ return &csi.NodeUnpublishVolumeResponse{}, nil
}
- err := os.Remove(targetPath)
- if err != nil && !os.IsNotExist(err) {
+ if err := volume.(*Volume).Unpublish(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
@@ -109,6 +165,13 @@ func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetC
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
+ Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
+ },
+ },
+ },
+ {
+ Type: &csi.NodeServiceCapability_Rpc{
+ Rpc: &csi.NodeServiceCapability_RPC{
// Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
//Type: csi.NodeServiceCapability_RPC_UNKNOWN,
Type: csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
@@ -124,60 +187,72 @@ func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVol
}
func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
+ volumeID := req.GetVolumeId()
+ stagingTargetPath := req.GetStagingTargetPath()
+ glog.V(0).Infof("node unstage volume %s from %s", volumeID, stagingTargetPath)
+
// Check arguments
- if req.GetVolumeId() == "" {
+ if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
- if req.GetStagingTargetPath() == "" {
+ if stagingTargetPath == "" {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
- return &csi.NodeUnstageVolumeResponse{}, nil
-}
+ volumeMutex := ns.getVolumeMutex(volumeID)
+ volumeMutex.Lock()
+ defer volumeMutex.Unlock()
-func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
- // Check arguments
- if req.GetVolumeId() == "" {
- return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
+ volume, ok := ns.volumes.Load(volumeID)
+ if !ok {
+ glog.Warningf("volume %s hasn't been staged", volumeID)
+ return &csi.NodeUnstageVolumeResponse{}, nil
}
- if req.GetStagingTargetPath() == "" {
- return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
+
+ if err := volume.(*Volume).Unstage(stagingTargetPath); err != nil {
+ return nil, status.Error(codes.Internal, err.Error())
+ } else {
+ ns.volumes.Delete(volumeID)
}
- return &csi.NodeStageVolumeResponse{}, nil
+
+ return &csi.NodeUnstageVolumeResponse{}, nil
}
func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
volumeID := req.GetVolumeId()
- glog.V(0).Infof("Node expand volume %s to %d bytes", volumeID, req.CapacityRange.RequiredBytes)
+ requiredBytes := req.GetCapacityRange().GetRequiredBytes()
+ glog.V(0).Infof("Node expand volume %s to %d bytes", volumeID, requiredBytes)
- localSocket := GetLocalSocket(volumeID)
- clientConn, err := grpc.Dial("passthrough:///unix://"+localSocket, grpc.WithTransportCredentials(insecure.NewCredentials()))
- if err != nil {
- return nil, err
+ volumeMutex := ns.getVolumeMutex(volumeID)
+ volumeMutex.Lock()
+ defer volumeMutex.Unlock()
+
+ if volume, ok := ns.volumes.Load(volumeID); ok {
+ if err := volume.(*Volume).Expand(requiredBytes); err != nil {
+ return nil, err
+ }
}
- defer clientConn.Close()
- client := mount_pb.NewSeaweedMountClient(clientConn)
- _, err = client.Configure(context.Background(), &mount_pb.ConfigureRequest{
- CollectionCapacity: req.CapacityRange.RequiredBytes,
- })
+ return &csi.NodeExpandVolumeResponse{}, nil
+}
- return &csi.NodeExpandVolumeResponse{
- CapacityBytes: req.CapacityRange.RequiredBytes,
- }, err
+func (ns *NodeServer) getVolumeMutex(volumeID string) *sync.RWMutex {
+ return ns.volumeMutexes.GetMutex(volumeID)
}
-func checkMount(targetPath string) (bool, error) {
- notMnt, err := mount.New("").IsLikelyNotMountPoint(targetPath)
- if err != nil {
- if os.IsNotExist(err) {
- if err = os.MkdirAll(targetPath, 0750); err != nil {
- return false, err
- }
- notMnt = true
- } else {
- return false, err
+func isVolumeReadOnly(req *csi.NodeStageVolumeRequest) bool {
+ mode := req.GetVolumeCapability().GetAccessMode().Mode
+
+ readOnlyModes := []csi.VolumeCapability_AccessMode_Mode{
+ csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY,
+ csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY,
+ }
+
+ for _, readOnlyMode := range readOnlyModes {
+ if mode == readOnlyMode {
+ return true
}
}
- return notMnt, nil
+
+ return false
}