aboutsummaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorchrislu <chrislu@roblox.com>2024-01-18 08:06:24 -0800
committerchrislu <chrislu@roblox.com>2024-01-18 08:06:24 -0800
commitb982553bf132fa850841f9997e182c6112dd22f3 (patch)
treec94ea66e80a85b7ea36321f590e7549f5d4a54cf /pkg
parent1c7e9db74fa8719d84804b30072af1bb680d8332 (diff)
downloadseaweedfs-csi-driver-b982553bf132fa850841f9997e182c6112dd22f3.tar.xz
seaweedfs-csi-driver-b982553bf132fa850841f9997e182c6112dd22f3.zip
Revert "chore(seaweedfs-csi-driver): delete unnecessary stage"
This reverts commit 44283c0ffe56e3180dae5b93801d07a3d621d355.
Diffstat (limited to 'pkg')
-rw-r--r--pkg/driver/mounter_seaweedfs.go2
-rw-r--r--pkg/driver/nodeserver.go141
-rw-r--r--pkg/driver/volume.go68
3 files changed, 166 insertions, 45 deletions
diff --git a/pkg/driver/mounter_seaweedfs.go b/pkg/driver/mounter_seaweedfs.go
index d3ffe8a..56ba98b 100644
--- a/pkg/driver/mounter_seaweedfs.go
+++ b/pkg/driver/mounter_seaweedfs.go
@@ -50,7 +50,7 @@ func (seaweedFs *seaweedFsMounter) Mount(target string) (Unmounter, error) {
}
// CacheDirForRead should be always defined - we use temp dir in case it is not defined
- // we need to use predictable cache path, because we need to clean it up on unpublish
+ // we need to use predictable cache path, because we need to clean it up on unstage
cacheDir := filepath.Join(seaweedFs.driver.CacheDir, seaweedFs.volumeID)
// Final args
diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go
index c0572a8..616c79b 100644
--- a/pkg/driver/nodeserver.go
+++ b/pkg/driver/nodeserver.go
@@ -26,12 +26,12 @@ type NodeServer struct {
var _ = csi.NodeServer(&NodeServer{})
-func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
+func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
volumeID := req.GetVolumeId()
// mount the fs here
- targetPath := req.GetTargetPath()
+ stagingTargetPath := req.GetStagingTargetPath()
- glog.Infof("node target volume %s to %s", volumeID, targetPath)
+ glog.Infof("node stage volume %s to %s", volumeID, stagingTargetPath)
// Check arguments
if req.GetVolumeCapability() == nil {
@@ -43,7 +43,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
- if targetPath == "" {
+ if stagingTargetPath == "" {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
@@ -51,10 +51,10 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
volumeMutex.Lock()
defer volumeMutex.Unlock()
- // The volume has been publish.
+ // The volume has been staged.
if _, ok := ns.volumes.Load(volumeID); ok {
- glog.Infof("volume %s has been already published", volumeID)
- return &csi.NodePublishVolumeResponse{}, nil
+ glog.Infof("volume %s has been already staged", volumeID)
+ return &csi.NodeStageVolumeResponse{}, nil
}
volContext := req.GetVolumeContext()
@@ -62,14 +62,14 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
mounter, err := newMounter(volumeID, readOnly, ns.Driver, volContext)
if err != nil {
- // node publish is unsuccessfull
+ // node stage is unsuccessfull
ns.removeVolumeMutex(volumeID)
return nil, err
}
volume := NewVolume(volumeID, mounter)
- if err := volume.Publish(targetPath); err != nil {
- // node publish is unsuccessfull
+ if err := volume.Stage(stagingTargetPath); err != nil {
+ // node stage is unsuccessfull
ns.removeVolumeMutex(volumeID)
if os.IsPermission(err) {
@@ -90,21 +90,63 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
}
ns.volumes.Store(volumeID, volume)
- glog.Infof("volume %s successfully publish to %s", volumeID, targetPath)
+ glog.Infof("volume %s successfully staged to %s", volumeID, stagingTargetPath)
+ return &csi.NodeStageVolumeResponse{}, nil
+}
+
+func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
+ volumeID := req.GetVolumeId()
+ targetPath := req.GetTargetPath()
+ stagingTargetPath := req.GetStagingTargetPath()
+
+ glog.Infof("node publish volume %s to %s", volumeID, targetPath)
+
+ // Check arguments
+ if req.GetVolumeCapability() == nil {
+ return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request")
+ }
+ if !isValidVolumeCapabilities(ns.Driver.vcap, []*csi.VolumeCapability{req.GetVolumeCapability()}) {
+ // return nil, status.Error(codes.InvalidArgument, "Volume capability not supported")
+ }
+ if volumeID == "" {
+ return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
+ }
+ if targetPath == "" {
+ return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
+ }
+ if stagingTargetPath == "" {
+ return nil, status.Error(codes.InvalidArgument, "Staging target path missing in request")
+ }
+
+ volumeMutex := ns.getVolumeMutex(volumeID)
+ volumeMutex.Lock()
+ defer volumeMutex.Unlock()
+
+ volume, ok := ns.volumes.Load(volumeID)
+ if !ok {
+ return nil, status.Error(codes.FailedPrecondition, "volume hasn't been staged yet")
+ }
+
+ // When pod uses a volume in read-only mode, k8s will automatically
+ // mount the volume as a read-only file system.
+ if err := volume.(*Volume).Publish(stagingTargetPath, targetPath, req.GetReadonly()); err != nil {
+ return nil, status.Error(codes.Internal, err.Error())
+ }
+
+ glog.Infof("volume %s successfully published to %s", volumeID, targetPath)
return &csi.NodePublishVolumeResponse{}, nil
}
func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
volumeID := req.GetVolumeId()
targetPath := req.GetTargetPath()
-
glog.Infof("node unpublish volume %s from %s", volumeID, targetPath)
- // Check arguments
if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
+
if targetPath == "" {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
@@ -119,18 +161,15 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
// make sure there is no any garbage
_ = mount.CleanupMountPoint(targetPath, mountutil, true)
- } else {
- if err := volume.(*Volume).Unpublish(targetPath); err != nil {
- return nil, status.Error(codes.Internal, err.Error())
- } else {
- ns.volumes.Delete(volumeID)
- }
+
+ return &csi.NodeUnpublishVolumeResponse{}, nil
}
- // remove mutex on successfull unpublish
- ns.volumeMutexes.RemoveMutex(volumeID)
+ if err := volume.(*Volume).Unpublish(targetPath); err != nil {
+ return nil, status.Error(codes.Internal, err.Error())
+ }
- glog.Infof("volume %s successfully unpublish from %s", volumeID, targetPath)
+ glog.Infof("volume %s successfully unpublished from %s", volumeID, targetPath)
return &csi.NodeUnpublishVolumeResponse{}, nil
}
@@ -145,11 +184,19 @@ func (ns *NodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoReque
func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
glog.V(3).Infof("node get capabilities")
+
return &csi.NodeGetCapabilitiesResponse{
Capabilities: []*csi.NodeServiceCapability{
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
+ Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
+ },
+ },
+ },
+ {
+ Type: &csi.NodeServiceCapability_Rpc{
+ Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
},
},
@@ -158,6 +205,46 @@ func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetC
}, nil
}
+func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
+ volumeID := req.GetVolumeId()
+ stagingTargetPath := req.GetStagingTargetPath()
+
+ glog.Infof("node unstage volume %s from %s", volumeID, stagingTargetPath)
+
+ // Check arguments
+ if volumeID == "" {
+ return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
+ }
+ if stagingTargetPath == "" {
+ return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
+ }
+
+ volumeMutex := ns.getVolumeMutex(volumeID)
+ volumeMutex.Lock()
+ defer volumeMutex.Unlock()
+
+ volume, ok := ns.volumes.Load(volumeID)
+ if !ok {
+ glog.Warningf("volume %s hasn't been staged", volumeID)
+
+ // make sure there is no any garbage
+ _ = mount.CleanupMountPoint(stagingTargetPath, mountutil, true)
+ } else {
+ if err := volume.(*Volume).Unstage(stagingTargetPath); err != nil {
+ return nil, status.Error(codes.Internal, err.Error())
+ } else {
+ ns.volumes.Delete(volumeID)
+ }
+ }
+
+ // remove mutex on successfull unstage
+ ns.volumeMutexes.RemoveMutex(volumeID)
+
+ glog.Infof("volume %s successfully unstaged from %s", volumeID, stagingTargetPath)
+
+ return &csi.NodeUnstageVolumeResponse{}, nil
+}
+
func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
volumeID := req.GetVolumeId()
volumePath := req.GetVolumePath()
@@ -192,11 +279,11 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
func (ns *NodeServer) NodeCleanup() {
ns.volumes.Range(func(_, vol any) bool {
v := vol.(*Volume)
- if len(v.TargetPath) > 0 {
- glog.Infof("cleaning up volume %s at %s", v.VolumeId, v.TargetPath)
- err := v.Unpublish(v.TargetPath)
+ if len(v.StagedPath) > 0 {
+ glog.Infof("cleaning up volume %s at %s", v.VolumeId, v.StagedPath)
+ err := v.Unstage(v.StagedPath)
if err != nil {
- glog.Warningf("error cleaning up volume %s at %s, err: %v", v.VolumeId, v.TargetPath, err)
+ glog.Warningf("error cleaning up volume %s at %s, err: %v", v.VolumeId, v.StagedPath, err)
}
}
return true
@@ -211,7 +298,7 @@ func (ns *NodeServer) removeVolumeMutex(volumeID string) {
ns.volumeMutexes.RemoveMutex(volumeID)
}
-func isVolumeReadOnly(req *csi.NodePublishVolumeRequest) bool {
+func isVolumeReadOnly(req *csi.NodeStageVolumeRequest) bool {
mode := req.GetVolumeCapability().GetAccessMode().Mode
readOnlyModes := []csi.VolumeCapability_AccessMode_Mode{
diff --git a/pkg/driver/volume.go b/pkg/driver/volume.go
index 38d1308..ac0a80a 100644
--- a/pkg/driver/volume.go
+++ b/pkg/driver/volume.go
@@ -9,11 +9,12 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
+ "k8s.io/mount-utils"
)
type Volume struct {
VolumeId string
- TargetPath string
+ StagedPath string
mounter Mounter
unmounter Unmounter
@@ -30,31 +31,55 @@ func NewVolume(volumeID string, mounter Mounter) *Volume {
}
}
-func (vol *Volume) Publish(targetPath string) error {
+func (vol *Volume) Stage(stagingTargetPath string) error {
// check whether it can be mounted
- if isMnt, err := checkMount(targetPath); err != nil {
+ if isMnt, err := checkMount(stagingTargetPath); err != nil {
return err
} else if isMnt {
// try to unmount before mounting again
- _ = mountutil.Unmount(targetPath)
+ _ = mountutil.Unmount(stagingTargetPath)
}
- if u, err := vol.mounter.Mount(targetPath); err == nil {
- if vol.TargetPath != "" {
- if vol.TargetPath == targetPath {
- glog.Warningf("target path is already set to %s for volume %s", vol.TargetPath, vol.VolumeId)
+ if u, err := vol.mounter.Mount(stagingTargetPath); err == nil {
+ if vol.StagedPath != "" {
+ if vol.StagedPath == stagingTargetPath {
+ glog.Warningf("staged path is already set to %s for volume %s", vol.StagedPath, vol.VolumeId)
} else {
- glog.Warningf("target path is already set to %s and differs from %s for volume %s", vol.TargetPath, targetPath, vol.VolumeId)
+ glog.Warningf("staged path is already set to %s and differs from %s for volume %s", vol.StagedPath, stagingTargetPath, vol.VolumeId)
}
}
- vol.TargetPath = targetPath
+
+ vol.StagedPath = stagingTargetPath
vol.unmounter = u
+
return nil
} else {
return err
}
}
+func (vol *Volume) Publish(stagingTargetPath string, targetPath string, readOnly bool) error {
+ // check whether it can be mounted
+ if isMnt, err := checkMount(targetPath); err != nil {
+ return err
+ } else if isMnt {
+ // maybe already mounted?
+ return nil
+ }
+
+ // Use bind mount to create an alias of the real mount point.
+ mountOptions := []string{"bind"}
+ if readOnly {
+ mountOptions = append(mountOptions, "ro")
+ }
+
+ if err := mountutil.Mount(stagingTargetPath, targetPath, "", mountOptions); err != nil {
+ return err
+ }
+
+ return nil
+}
+
func (vol *Volume) Quota(sizeByte int64) error {
target := fmt.Sprintf("passthrough:///unix://%s", vol.localSocket)
dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
@@ -73,24 +98,33 @@ func (vol *Volume) Quota(sizeByte int64) error {
}
func (vol *Volume) Unpublish(targetPath string) error {
- glog.V(0).Infof("unmounting volume %s from %s", vol.VolumeId, targetPath)
+ // Try unmounting target path and deleting it.
+ if err := mount.CleanupMountPoint(targetPath, mountutil, true); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (vol *Volume) Unstage(stagingTargetPath string) error {
+ glog.V(0).Infof("unmounting volume %s from %s", vol.VolumeId, stagingTargetPath)
if vol.unmounter == nil {
- glog.Errorf("volume is not mounted: %s, path: %s", vol.VolumeId, targetPath)
+ glog.Errorf("volume is not mounted: %s, path: %s", vol.VolumeId, stagingTargetPath)
return nil
}
- if targetPath != vol.TargetPath {
- glog.Warningf("staging path %s differs for volume %s at %s", targetPath, vol.VolumeId, vol.TargetPath)
+ if stagingTargetPath != vol.StagedPath {
+ glog.Warningf("staging path %s differs for volume %s at %s", stagingTargetPath, vol.VolumeId, vol.StagedPath)
}
if err := vol.unmounter.Unmount(); err != nil {
- glog.Errorf("error unmounting volume during unstage: %s, err: %v", targetPath, err)
+ glog.Errorf("error unmounting volume during unstage: %s, err: %v", stagingTargetPath, err)
return err
}
- if err := os.Remove(targetPath); err != nil && !os.IsNotExist(err) {
- glog.Errorf("error removing staging path for volume %s at %s, err: %v", vol.VolumeId, targetPath, err)
+ if err := os.Remove(stagingTargetPath); err != nil && !os.IsNotExist(err) {
+ glog.Errorf("error removing staging path for volume %s at %s, err: %v", vol.VolumeId, stagingTargetPath, err)
return err
}