aboutsummaryrefslogtreecommitdiff
path: root/pkg/driver
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/driver')
-rw-r--r--pkg/driver/nodeserver.go68
-rw-r--r--pkg/driver/volume.go73
2 files changed, 75 insertions, 66 deletions
diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go
index c0572a8..d4e95f9 100644
--- a/pkg/driver/nodeserver.go
+++ b/pkg/driver/nodeserver.go
@@ -26,6 +26,17 @@ type NodeServer struct {
var _ = csi.NodeServer(&NodeServer{})
+func (ns *NodeServer) getVolume(volumeID string) *Volume {
+ if volume, ok := ns.volumes.Load(volumeID); ok {
+ return volume.(*Volume)
+ }
+ return nil
+}
+
+func (ns *NodeServer) setVolume(volumeID string, volume *Volume) {
+ ns.volumes.Store(volumeID, volume)
+}
+
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
volumeID := req.GetVolumeId()
// mount the fs here
@@ -51,24 +62,28 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
volumeMutex.Lock()
defer volumeMutex.Unlock()
+ volume := ns.getVolume(volumeID)
+ if volume == nil {
+ volume = NewVolume(volumeID)
+ }
// The volume has been publish.
- if _, ok := ns.volumes.Load(volumeID); ok {
- glog.Infof("volume %s has been already published", volumeID)
- return &csi.NodePublishVolumeResponse{}, nil
+ for _, volumePath := range volume.volumePaths {
+ if volumePath.path == targetPath {
+ glog.Infof("volume %s has been already published", volumeID)
+ return &csi.NodePublishVolumeResponse{}, nil
+ }
}
- volContext := req.GetVolumeContext()
- readOnly := isVolumeReadOnly(req)
-
- mounter, err := newMounter(volumeID, readOnly, ns.Driver, volContext)
- if err != nil {
- // node publish is unsuccessfull
+ volumePath := &VolumePath{path: targetPath, volumeId: volumeID}
+ if mounter, err := newMounter(volumeID, isVolumeReadOnly(req), ns.Driver, req.GetVolumeContext()); err != nil {
ns.removeVolumeMutex(volumeID)
return nil, err
+ } else {
+ volumePath.mounter = mounter
+ volume.volumePaths = append(volume.volumePaths, volumePath)
}
- volume := NewVolume(volumeID, mounter)
- if err := volume.Publish(targetPath); err != nil {
+ if err := volume.Publish(volumePath); err != nil {
// node publish is unsuccessfull
ns.removeVolumeMutex(volumeID)
@@ -89,7 +104,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return nil, err
}
- ns.volumes.Store(volumeID, volume)
+ ns.setVolume(volumeID, volume)
glog.Infof("volume %s successfully publish to %s", volumeID, targetPath)
return &csi.NodePublishVolumeResponse{}, nil
@@ -113,18 +128,19 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
volumeMutex.Lock()
defer volumeMutex.Unlock()
- volume, ok := ns.volumes.Load(volumeID)
- if !ok {
+ if volume := ns.getVolume(volumeID); volume != nil {
+ if err := volume.Unpublish(targetPath); err != nil {
+ return nil, status.Error(codes.Internal, err.Error())
+ } else {
+ if len(volume.volumePaths) == 0 {
+ ns.volumes.Delete(volumeID)
+ }
+ }
+ } else {
glog.Warningf("volume %s hasn't been published", volumeID)
// make sure there is no any garbage
_ = mount.CleanupMountPoint(targetPath, mountutil, true)
- } else {
- if err := volume.(*Volume).Unpublish(targetPath); err != nil {
- return nil, status.Error(codes.Internal, err.Error())
- } else {
- ns.volumes.Delete(volumeID)
- }
}
// remove mutex on successfull unpublish
@@ -180,8 +196,8 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
volumeMutex.Lock()
defer volumeMutex.Unlock()
- if volume, ok := ns.volumes.Load(volumeID); ok {
- if err := volume.(*Volume).Quota(requiredBytes); err != nil {
+ if volume := ns.getVolume(volumeID); volume != nil {
+ if err := volume.Quota(requiredBytes); err != nil {
return nil, err
}
}
@@ -192,11 +208,11 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
func (ns *NodeServer) NodeCleanup() {
ns.volumes.Range(func(_, vol any) bool {
v := vol.(*Volume)
- if len(v.TargetPath) > 0 {
- glog.Infof("cleaning up volume %s at %s", v.VolumeId, v.TargetPath)
- err := v.Unpublish(v.TargetPath)
+ for _, volumePath := range v.volumePaths {
+ glog.Infof("cleaning up volume %s at %s", v.VolumeId, volumePath.path)
+ err := v.Unpublish(volumePath.path)
if err != nil {
- glog.Warningf("error cleaning up volume %s at %s, err: %v", v.VolumeId, v.TargetPath, err)
+ glog.Warningf("error cleaning up volume %s at %s, err: %v", v.VolumeId, volumePath.path, err)
}
}
return true
diff --git a/pkg/driver/volume.go b/pkg/driver/volume.go
index 38d1308..f1d47ad 100644
--- a/pkg/driver/volume.go
+++ b/pkg/driver/volume.go
@@ -3,52 +3,48 @@ package driver
import (
"context"
"fmt"
- "os"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
+ "k8s.io/mount-utils"
)
-type Volume struct {
- VolumeId string
- TargetPath string
-
+type VolumePath struct {
+ path string
mounter Mounter
unmounter Unmounter
+ volumeId string
+}
+
+type Volume struct {
+ VolumeId string
// unix socket used to manage volume
localSocket string
+ volumePaths []*VolumePath
}
-func NewVolume(volumeID string, mounter Mounter) *Volume {
+func NewVolume(volumeID string) *Volume {
return &Volume{
VolumeId: volumeID,
- mounter: mounter,
localSocket: GetLocalSocket(volumeID),
}
}
-func (vol *Volume) Publish(targetPath string) error {
+func (vol *Volume) Publish(volumePath *VolumePath) error {
// check whether it can be mounted
- if isMnt, err := checkMount(targetPath); err != nil {
+ if isMnt, err := checkMount(volumePath.path); err != nil {
return err
} else if isMnt {
// try to unmount before mounting again
- _ = mountutil.Unmount(targetPath)
+ _ = mountutil.Unmount(volumePath.path)
}
- if u, err := vol.mounter.Mount(targetPath); err == nil {
- if vol.TargetPath != "" {
- if vol.TargetPath == targetPath {
- glog.Warningf("target path is already set to %s for volume %s", vol.TargetPath, vol.VolumeId)
- } else {
- glog.Warningf("target path is already set to %s and differs from %s for volume %s", vol.TargetPath, targetPath, vol.VolumeId)
- }
- }
- vol.TargetPath = targetPath
- vol.unmounter = u
+ if unmounter, err := volumePath.mounter.Mount(volumePath.path); err == nil {
+ volumePath.unmounter = unmounter
+ vol.volumePaths = append(vol.volumePaths, volumePath)
return nil
} else {
return err
@@ -74,25 +70,22 @@ func (vol *Volume) Quota(sizeByte int64) error {
func (vol *Volume) Unpublish(targetPath string) error {
glog.V(0).Infof("unmounting volume %s from %s", vol.VolumeId, targetPath)
-
- if vol.unmounter == nil {
- glog.Errorf("volume is not mounted: %s, path: %s", vol.VolumeId, targetPath)
- return nil
- }
-
- if targetPath != vol.TargetPath {
- glog.Warningf("staging path %s differs for volume %s at %s", targetPath, vol.VolumeId, vol.TargetPath)
- }
-
- if err := vol.unmounter.Unmount(); err != nil {
- glog.Errorf("error unmounting volume during unstage: %s, err: %v", targetPath, err)
- return err
- }
-
- if err := os.Remove(targetPath); err != nil && !os.IsNotExist(err) {
- glog.Errorf("error removing staging path for volume %s at %s, err: %v", vol.VolumeId, targetPath, err)
- return err
+ for index, volumePath := range vol.volumePaths {
+ if volumePath.path == targetPath {
+ vol.volumePaths = append(vol.volumePaths[:index], vol.volumePaths[index+1:]...)
+ if volumePath.unmounter != nil {
+ err := volumePath.unmounter.Unmount()
+ if err != nil {
+ glog.Errorf("error unmounting volume during unstage: %s, err: %v", targetPath, err)
+ } else { // unmount success
+ return nil
+ }
+ } else {
+ glog.Errorf("volume %s is no mounter, path: %s", vol.VolumeId, targetPath)
+ }
+ break
+ }
}
-
- return nil
+ glog.Warningf("volume %s cannot use unmounter, use default cleanup mount point %s", targetPath, vol.VolumeId)
+ return mount.CleanupMountPoint(targetPath, mountutil, true)
}