aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--deploy/helm/seaweedfs-csi-driver/templates/daemonset.yml7
-rw-r--r--pkg/driver/controllerserver.go23
-rw-r--r--pkg/driver/nodeserver.go191
-rw-r--r--pkg/driver/utils.go45
-rw-r--r--pkg/driver/volume.go147
5 files changed, 331 insertions, 82 deletions
diff --git a/deploy/helm/seaweedfs-csi-driver/templates/daemonset.yml b/deploy/helm/seaweedfs-csi-driver/templates/daemonset.yml
index 60a1338..8e768d3 100644
--- a/deploy/helm/seaweedfs-csi-driver/templates/daemonset.yml
+++ b/deploy/helm/seaweedfs-csi-driver/templates/daemonset.yml
@@ -87,6 +87,9 @@ spec:
volumeMounts:
- name: plugin-dir
mountPath: /csi
+ - name: plugins-dir
+ mountPath: /var/lib/kubelet/plugins
+ mountPropagation: "Bidirectional"
- name: pods-mount-dir
mountPath: /var/lib/kubelet/pods
mountPropagation: "Bidirectional"
@@ -105,6 +108,10 @@ spec:
hostPath:
path: /var/lib/kubelet/plugins/{{ .Values.driverName }}
type: DirectoryOrCreate
+ - name: plugins-dir
+ hostPath:
+ path: /var/lib/kubelet/plugins
+ type: Directory
- name: pods-mount-dir
hostPath:
path: /var/lib/kubelet/pods
diff --git a/pkg/driver/controllerserver.go b/pkg/driver/controllerserver.go
index 6a8585c..3ab0853 100644
--- a/pkg/driver/controllerserver.go
+++ b/pkg/driver/controllerserver.go
@@ -11,11 +11,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/mount_pb"
"github.com/container-storage-interface/spec/lib/go/csi"
- "google.golang.org/grpc"
"google.golang.org/grpc/codes"
- "google.golang.org/grpc/credentials/insecure"
_ "google.golang.org/grpc/resolver/passthrough"
"google.golang.org/grpc/status"
)
@@ -175,25 +172,7 @@ func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnap
}
func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
- volumeID := req.GetVolumeId()
- glog.V(0).Infof("Controller expand volume %s to %d bytes", volumeID, req.CapacityRange.RequiredBytes)
-
- localSocket := GetLocalSocket(volumeID)
- clientConn, err := grpc.Dial("passthrough:///unix://"+localSocket, grpc.WithTransportCredentials(insecure.NewCredentials()))
- if err != nil {
- return nil, err
- }
- defer clientConn.Close()
-
- client := mount_pb.NewSeaweedMountClient(clientConn)
- _, err = client.Configure(context.Background(), &mount_pb.ConfigureRequest{
- CollectionCapacity: req.CapacityRange.RequiredBytes,
- })
-
- return &csi.ControllerExpandVolumeResponse{
- CapacityBytes: req.CapacityRange.RequiredBytes,
- }, err
-
+ return nil, status.Error(codes.Unimplemented, "")
}
func (cs *ControllerServer) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go
index d48875f..5a7206e 100644
--- a/pkg/driver/nodeserver.go
+++ b/pkg/driver/nodeserver.go
@@ -4,13 +4,11 @@ import (
"context"
"os"
"strings"
+ "sync"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/mount_pb"
"github.com/container-storage-interface/spec/lib/go/csi"
- "google.golang.org/grpc"
"google.golang.org/grpc/codes"
- "google.golang.org/grpc/credentials/insecure"
_ "google.golang.org/grpc/resolver/passthrough"
"google.golang.org/grpc/status"
"k8s.io/utils/mount"
@@ -19,16 +17,19 @@ import (
type NodeServer struct {
Driver *SeaweedFsDriver
mounter mount.Interface
+
+ // information about the managed volumes
+ volumes sync.Map
+ volumeMutexes *KeyMutex
}
var _ = csi.NodeServer(&NodeServer{})
-func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
+func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
volumeID := req.GetVolumeId()
// mount the fs here
- targetPath := req.GetTargetPath()
-
- glog.V(0).Infof("NodePublishVolume volume %s to %s", volumeID, targetPath)
+ stagingTargetPath := req.GetStagingTargetPath()
+ glog.V(0).Infof("node stage volume %s to %s", volumeID, stagingTargetPath)
// Check arguments
if req.GetVolumeCapability() == nil {
@@ -40,25 +41,30 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
- if targetPath == "" {
+ if stagingTargetPath == "" {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
- // check whether it can be mounted
- notMnt, err := checkMount(targetPath)
- if err != nil {
- return nil, status.Error(codes.Internal, err.Error())
- }
- if !notMnt {
- return &csi.NodePublishVolumeResponse{}, nil
+ volumeMutex := ns.getVolumeMutex(volumeID)
+ volumeMutex.Lock()
+ defer volumeMutex.Unlock()
+
+ // The volume has been staged.
+ if _, ok := ns.volumes.Load(volumeID); ok {
+ glog.V(0).Infof("volume %s has been staged", volumeID)
+ return &csi.NodeStageVolumeResponse{}, nil
}
volContext := req.GetVolumeContext()
- mounter, err := newMounter(volumeID, req.GetReadonly(), ns.Driver, volContext)
+ readOnly := isVolumeReadOnly(req)
+
+ mounter, err := newMounter(volumeID, readOnly, ns.Driver, volContext)
if err != nil {
return nil, err
}
- if err := mounter.Mount(targetPath); err != nil {
+
+ volume := NewVolume(volumeID, mounter)
+ if err := volume.Stage(stagingTargetPath); err != nil {
if os.IsPermission(err) {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
@@ -68,25 +74,75 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return nil, status.Error(codes.Internal, err.Error())
}
- glog.V(0).Infof("volume %s successfully mounted to %s", volumeID, targetPath)
+ ns.volumes.Store(volumeID, volume)
+ glog.V(0).Infof("volume %s successfully staged to %s", volumeID, stagingTargetPath)
+
+ return &csi.NodeStageVolumeResponse{}, nil
+}
+
+func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
+ volumeID := req.GetVolumeId()
+ targetPath := req.GetTargetPath()
+
+ glog.V(0).Infof("node publish volume %s to %s", volumeID, targetPath)
+
+ // Check arguments
+ if req.GetVolumeCapability() == nil {
+ return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request")
+ }
+ if !isValidVolumeCapabilities(ns.Driver.vcap, []*csi.VolumeCapability{req.GetVolumeCapability()}) {
+ // return nil, status.Error(codes.InvalidArgument, "Volume capability not supported")
+ }
+ if volumeID == "" {
+ return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
+ }
+ if targetPath == "" {
+ return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
+ }
+
+ volumeMutex := ns.getVolumeMutex(volumeID)
+ volumeMutex.Lock()
+ defer volumeMutex.Unlock()
+
+ volume, ok := ns.volumes.Load(volumeID)
+ if !ok {
+ return nil, status.Error(codes.FailedPrecondition, "volume hasn't been staged yet")
+ }
+
+ // When pod uses a volume in read-only mode, k8s will automatically
+ // mount the volume as a read-only file system.
+ if err := volume.(*Volume).Publish(targetPath); err != nil {
+ return nil, status.Error(codes.Internal, err.Error())
+ }
+ glog.V(0).Infof("volume %s successfully published to %s", volumeID, targetPath)
return &csi.NodePublishVolumeResponse{}, nil
}
func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
-
+ volumeID := req.GetVolumeId()
targetPath := req.GetTargetPath()
+ glog.V(0).Infof("node unpublish volume %s from %s", volumeID, targetPath)
+
+ if volumeID == "" {
+ return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
+ }
if targetPath == "" {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
- if err := fuseUnmount(targetPath); err != nil {
- return nil, status.Error(codes.Internal, err.Error())
+ volumeMutex := ns.getVolumeMutex(volumeID)
+ volumeMutex.Lock()
+ defer volumeMutex.Unlock()
+
+ volume, ok := ns.volumes.Load(volumeID)
+ if !ok {
+ glog.Warningf("volume %s hasn't been published", volumeID)
+ return &csi.NodeUnpublishVolumeResponse{}, nil
}
- err := os.Remove(targetPath)
- if err != nil && !os.IsNotExist(err) {
+ if err := volume.(*Volume).Unpublish(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
@@ -109,6 +165,13 @@ func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetC
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
+ Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
+ },
+ },
+ },
+ {
+ Type: &csi.NodeServiceCapability_Rpc{
+ Rpc: &csi.NodeServiceCapability_RPC{
// Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
//Type: csi.NodeServiceCapability_RPC_UNKNOWN,
Type: csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
@@ -124,60 +187,72 @@ func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVol
}
func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
+ volumeID := req.GetVolumeId()
+ stagingTargetPath := req.GetStagingTargetPath()
+ glog.V(0).Infof("node unstage volume %s from %s", volumeID, stagingTargetPath)
+
// Check arguments
- if req.GetVolumeId() == "" {
+ if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
- if req.GetStagingTargetPath() == "" {
+ if stagingTargetPath == "" {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
- return &csi.NodeUnstageVolumeResponse{}, nil
-}
+ volumeMutex := ns.getVolumeMutex(volumeID)
+ volumeMutex.Lock()
+ defer volumeMutex.Unlock()
-func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
- // Check arguments
- if req.GetVolumeId() == "" {
- return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
+ volume, ok := ns.volumes.Load(volumeID)
+ if !ok {
+ glog.Warningf("volume %s hasn't been staged", volumeID)
+ return &csi.NodeUnstageVolumeResponse{}, nil
}
- if req.GetStagingTargetPath() == "" {
- return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
+
+ if err := volume.(*Volume).Unstage(stagingTargetPath); err != nil {
+ return nil, status.Error(codes.Internal, err.Error())
+ } else {
+ ns.volumes.Delete(volumeID)
}
- return &csi.NodeStageVolumeResponse{}, nil
+
+ return &csi.NodeUnstageVolumeResponse{}, nil
}
func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
volumeID := req.GetVolumeId()
- glog.V(0).Infof("Node expand volume %s to %d bytes", volumeID, req.CapacityRange.RequiredBytes)
+ requiredBytes := req.GetCapacityRange().GetRequiredBytes()
+ glog.V(0).Infof("Node expand volume %s to %d bytes", volumeID, requiredBytes)
- localSocket := GetLocalSocket(volumeID)
- clientConn, err := grpc.Dial("passthrough:///unix://"+localSocket, grpc.WithTransportCredentials(insecure.NewCredentials()))
- if err != nil {
- return nil, err
+ volumeMutex := ns.getVolumeMutex(volumeID)
+ volumeMutex.Lock()
+ defer volumeMutex.Unlock()
+
+ if volume, ok := ns.volumes.Load(volumeID); ok {
+ if err := volume.(*Volume).Expand(requiredBytes); err != nil {
+ return nil, err
+ }
}
- defer clientConn.Close()
- client := mount_pb.NewSeaweedMountClient(clientConn)
- _, err = client.Configure(context.Background(), &mount_pb.ConfigureRequest{
- CollectionCapacity: req.CapacityRange.RequiredBytes,
- })
+ return &csi.NodeExpandVolumeResponse{}, nil
+}
- return &csi.NodeExpandVolumeResponse{
- CapacityBytes: req.CapacityRange.RequiredBytes,
- }, err
+func (ns *NodeServer) getVolumeMutex(volumeID string) *sync.RWMutex {
+ return ns.volumeMutexes.GetMutex(volumeID)
}
-func checkMount(targetPath string) (bool, error) {
- notMnt, err := mount.New("").IsLikelyNotMountPoint(targetPath)
- if err != nil {
- if os.IsNotExist(err) {
- if err = os.MkdirAll(targetPath, 0750); err != nil {
- return false, err
- }
- notMnt = true
- } else {
- return false, err
+func isVolumeReadOnly(req *csi.NodeStageVolumeRequest) bool {
+ mode := req.GetVolumeCapability().GetAccessMode().Mode
+
+ readOnlyModes := []csi.VolumeCapability_AccessMode_Mode{
+ csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY,
+ csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY,
+ }
+
+ for _, readOnlyMode := range readOnlyModes {
+ if mode == readOnlyMode {
+ return true
}
}
- return notMnt, nil
+
+ return false
}
diff --git a/pkg/driver/utils.go b/pkg/driver/utils.go
index b783f99..0f45733 100644
--- a/pkg/driver/utils.go
+++ b/pkg/driver/utils.go
@@ -2,18 +2,23 @@ package driver
import (
"fmt"
+ "os"
"strings"
+ "sync"
- "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/container-storage-interface/spec/lib/go/csi"
"golang.org/x/net/context"
"google.golang.org/grpc"
+ "k8s.io/utils/mount"
)
func NewNodeServer(n *SeaweedFsDriver) *NodeServer {
return &NodeServer{
- Driver: n,
+ Driver: n,
+ volumeMutexes: NewKeyMutex(32),
}
}
@@ -58,3 +63,39 @@ func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, h
glog.V(3).Infof("GRPC %s response %+v", info.FullMethod, resp)
return resp, err
}
+
+func checkMount(targetPath string) (bool, error) {
+ notMnt, err := mount.New("").IsLikelyNotMountPoint(targetPath)
+ if err != nil {
+ if os.IsNotExist(err) {
+ if err = os.MkdirAll(targetPath, 0750); err != nil {
+ return false, err
+ }
+ notMnt = true
+ } else {
+ return false, err
+ }
+ }
+ return notMnt, nil
+}
+
+type KeyMutex struct {
+ mutexes []sync.RWMutex
+ size int32
+}
+
+func NewKeyMutex(size int32) *KeyMutex {
+ return &KeyMutex{
+ mutexes: make([]sync.RWMutex, size),
+ size: size,
+ }
+}
+
+func (km *KeyMutex) GetMutex(key string) *sync.RWMutex {
+ index := util.HashToInt32([]byte(key))
+ if index < 0 {
+ index = -index
+ }
+
+ return &km.mutexes[index%km.size]
+}
diff --git a/pkg/driver/volume.go b/pkg/driver/volume.go
new file mode 100644
index 0000000..44b4e65
--- /dev/null
+++ b/pkg/driver/volume.go
@@ -0,0 +1,147 @@
+package driver
+
+import (
+ "context"
+ "fmt"
+ "os"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/mount_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+)
+
+type Volume struct {
+ VolumeId string
+
+ // volume's real mount point
+ stagingTargetPath string
+
+ // Target paths to which the volume has been published.
+ // These paths are symbolic links to the real mount point.
+ // So multiple pods using the same volume can share a mount.
+ targetPaths map[string]bool
+
+ mounter Mounter
+
+ // unix socket used to manage volume
+ localSocket string
+}
+
+func NewVolume(volumeID string, mounter Mounter) *Volume {
+ return &Volume{
+ VolumeId: volumeID,
+ mounter: mounter,
+ targetPaths: make(map[string]bool),
+ }
+}
+
+func (vol *Volume) Stage(stagingTargetPath string) error {
+ if vol.isStaged() {
+ return nil
+ }
+
+ // check whether it can be mounted
+ if notMnt, err := checkMount(stagingTargetPath); err != nil {
+ return err
+ } else if !notMnt {
+ // maybe already mounted?
+ return nil
+ }
+
+ if err := vol.mounter.Mount(stagingTargetPath); err != nil {
+ return err
+ }
+
+ vol.stagingTargetPath = stagingTargetPath
+ return nil
+}
+
+func (vol *Volume) Publish(targetPath string) error {
+ if err := os.Remove(targetPath); err != nil && !os.IsNotExist(err) {
+ return err
+ }
+
+ if err := os.Symlink(vol.stagingTargetPath, targetPath); err != nil {
+ return err
+ }
+
+ vol.targetPaths[targetPath] = true
+ return nil
+}
+
+func (vol *Volume) Expand(sizeByte int64) error {
+ if !vol.isStaged() {
+ return nil
+ }
+
+ target := fmt.Sprintf("passthrough:///unix://%s", vol.getLocalSocket())
+ dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
+
+ clientConn, err := grpc.Dial(target, dialOption)
+ if err != nil {
+ return err
+ }
+ defer clientConn.Close()
+
+ client := mount_pb.NewSeaweedMountClient(clientConn)
+ _, err = client.Configure(context.Background(), &mount_pb.ConfigureRequest{
+ CollectionCapacity: sizeByte,
+ })
+ return err
+}
+
+func (vol *Volume) Unpublish(targetPath string) error {
+ // Check whether the volume is published to the target path.
+ if _, ok := vol.targetPaths[targetPath]; !ok {
+ glog.Warningf("volume %s hasn't been published to %s", vol.VolumeId, targetPath)
+ return nil
+ }
+
+ delete(vol.targetPaths, targetPath)
+
+ if err := os.Remove(targetPath); err != nil && !os.IsNotExist(err) {
+ return err
+ }
+
+ return nil
+}
+
+func (vol *Volume) Unstage(_ string) error {
+ if !vol.isStaged() {
+ return nil
+ }
+
+ mountPoint := vol.stagingTargetPath
+ glog.V(0).Infof("unmounting volume %s from %s", vol.VolumeId, mountPoint)
+
+ if err := fuseUnmount(mountPoint); err != nil {
+ return err
+ }
+
+ if err := os.Remove(mountPoint); err != nil && !os.IsNotExist(err) {
+ return err
+ }
+
+ return nil
+}
+
+func (vol *Volume) isStaged() bool {
+ return vol.stagingTargetPath != ""
+}
+
+func (vol *Volume) getLocalSocket() string {
+ if vol.localSocket != "" {
+ return vol.localSocket
+ }
+
+ montDirHash := util.HashToInt32([]byte(vol.VolumeId))
+ if montDirHash < 0 {
+ montDirHash = -montDirHash
+ }
+
+ socket := fmt.Sprintf("/tmp/seaweedfs-mount-%d.sock", montDirHash)
+ vol.localSocket = socket
+ return socket
+}