aboutsummaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorchrislusf <chris.lu@gmail.com>2025-12-03 19:36:35 -0800
committerChris Lu <chrislusf@users.noreply.github.com>2025-12-03 20:52:27 -0800
commit05ac88f67723f1a6ce60543413f3ae59d2e85654 (patch)
tree2030b7f8be258b90c07da00cb4a9a1e593abe966 /pkg
parente76bd693e2022ac71f857548b0919155ebb04ca9 (diff)
downloadseaweedfs-csi-driver-05ac88f67723f1a6ce60543413f3ae59d2e85654.tar.xz
seaweedfs-csi-driver-05ac88f67723f1a6ce60543413f3ae59d2e85654.zip
refactor: address code review feedback
- Handle unexpected stat errors in cleanupStaleStagingPath (high priority) - Extract staging logic into stageNewVolume helper method for reuse - Extract isReadOnlyAccessMode helper to avoid duplicated read-only checks - Remove redundant mountutil.Unmount call (CleanupMountPoint already handles it)
Diffstat (limited to 'pkg')
-rw-r--r--pkg/driver/mount_util.go10
-rw-r--r--pkg/driver/nodeserver.go85
-rw-r--r--pkg/driver/volume.go7
3 files changed, 43 insertions, 59 deletions
diff --git a/pkg/driver/mount_util.go b/pkg/driver/mount_util.go
index 049d4dc..200817d 100644
--- a/pkg/driver/mount_util.go
+++ b/pkg/driver/mount_util.go
@@ -83,10 +83,14 @@ func cleanupStaleStagingPath(stagingPath string) error {
// If stat fails with a different error (like corrupted mount), try force cleanup
if mount.IsCorruptedMnt(err) {
// Force unmount for corrupted mounts
- if err := mount.CleanupMountPoint(stagingPath, mountutil, true); err != nil {
- glog.Warningf("failed to cleanup corrupted mount point %s: %v", stagingPath, err)
- return err
+ if cleanupErr := mount.CleanupMountPoint(stagingPath, mountutil, true); cleanupErr != nil {
+ glog.Warningf("failed to cleanup corrupted mount point %s: %v", stagingPath, cleanupErr)
+ return cleanupErr
}
+ } else {
+ // stat failed with an unexpected error, return it
+ glog.Warningf("stat on staging path %s failed during cleanup: %v", stagingPath, err)
+ return err
}
}
diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go
index 43d0c1b..b9059fe 100644
--- a/pkg/driver/nodeserver.go
+++ b/pkg/driver/nodeserver.go
@@ -82,16 +82,9 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
volContext := req.GetVolumeContext()
readOnly := isVolumeReadOnly(req)
- mounter, err := newMounter(volumeID, readOnly, ns.Driver, volContext)
+ volume, err := ns.stageNewVolume(volumeID, stagingTargetPath, volContext, readOnly)
if err != nil {
- // node stage is unsuccessfull
- ns.removeVolumeMutex(volumeID)
- return nil, err
- }
-
- volume := NewVolume(volumeID, mounter)
- if err := volume.Stage(stagingTargetPath); err != nil {
- // node stage is unsuccessfull
+ // node stage is unsuccessful
ns.removeVolumeMutex(volumeID)
if os.IsPermission(err) {
@@ -103,17 +96,6 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
return nil, status.Error(codes.Internal, err.Error())
}
- // In seaweedfs quota is not configured on seaweedfs servers.
- // Quota is applied only per mount.
- // Previously we used to cmdline parameter to apply it, but such way does not allow dynamic resizing.
- if capacity, err := k8s.GetVolumeCapacity(volumeID); err == nil {
- if err := volume.Quota(capacity); err != nil {
- return nil, err
- }
- } else {
- glog.Infof("orchestration system is not compatible with the k8s api, error is: %s", err)
- }
-
ns.volumes.Store(volumeID, volume)
glog.Infof("volume %s successfully staged to %s", volumeID, stagingTargetPath)
@@ -170,27 +152,15 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
glog.Warningf("failed to cleanup stale staging path %s: %v", stagingTargetPath, err)
}
- // Re-stage the volume
+ // Re-stage the volume using the shared helper
volContext := req.GetVolumeContext()
readOnly := isPublishVolumeReadOnly(req)
- mounter, err := newMounter(volumeID, readOnly, ns.Driver, volContext)
+ newVolume, err := ns.stageNewVolume(volumeID, stagingTargetPath, volContext, readOnly)
if err != nil {
- return nil, status.Errorf(codes.Internal, "failed to create mounter for re-staging: %v", err)
- }
-
- newVolume := NewVolume(volumeID, mounter)
- if err := newVolume.Stage(stagingTargetPath); err != nil {
return nil, status.Errorf(codes.Internal, "failed to re-stage volume: %v", err)
}
- // Apply quota if available
- if capacity, err := k8s.GetVolumeCapacity(volumeID); err == nil {
- if err := newVolume.Quota(capacity); err != nil {
- glog.Warningf("failed to apply quota during re-staging: %v", err)
- }
- }
-
ns.volumes.Store(volumeID, newVolume)
volume = newVolume
glog.Infof("volume %s successfully re-staged to %s", volumeID, stagingTargetPath)
@@ -225,20 +195,7 @@ func isPublishVolumeReadOnly(req *csi.NodePublishVolumeRequest) bool {
if req.GetReadonly() {
return true
}
-
- mode := req.GetVolumeCapability().GetAccessMode().Mode
- readOnlyModes := []csi.VolumeCapability_AccessMode_Mode{
- csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY,
- csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY,
- }
-
- for _, readOnlyMode := range readOnlyModes {
- if mode == readOnlyMode {
- return true
- }
- }
-
- return false
+ return isReadOnlyAccessMode(req.GetVolumeCapability().GetAccessMode().Mode)
}
func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
@@ -401,9 +358,33 @@ func (ns *NodeServer) removeVolumeMutex(volumeID string) {
ns.volumeMutexes.RemoveMutex(volumeID)
}
-func isVolumeReadOnly(req *csi.NodeStageVolumeRequest) bool {
- mode := req.GetVolumeCapability().GetAccessMode().Mode
+// stageNewVolume creates and stages a new volume with the given parameters.
+// This is a helper method used by both NodeStageVolume and NodePublishVolume (for re-staging).
+func (ns *NodeServer) stageNewVolume(volumeID, stagingTargetPath string, volContext map[string]string, readOnly bool) (*Volume, error) {
+ mounter, err := newMounter(volumeID, readOnly, ns.Driver, volContext)
+ if err != nil {
+ return nil, err
+ }
+
+ volume := NewVolume(volumeID, mounter)
+ if err := volume.Stage(stagingTargetPath); err != nil {
+ return nil, err
+ }
+ // Apply quota if available
+ if capacity, err := k8s.GetVolumeCapacity(volumeID); err == nil {
+ if err := volume.Quota(capacity); err != nil {
+ glog.Warningf("failed to apply quota for volume %s: %v", volumeID, err)
+ }
+ } else {
+ glog.V(4).Infof("orchestration system is not compatible with the k8s api, error is: %s", err)
+ }
+
+ return volume, nil
+}
+
+// isReadOnlyAccessMode checks if the given access mode is read-only.
+func isReadOnlyAccessMode(mode csi.VolumeCapability_AccessMode_Mode) bool {
readOnlyModes := []csi.VolumeCapability_AccessMode_Mode{
csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY,
csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY,
@@ -417,3 +398,7 @@ func isVolumeReadOnly(req *csi.NodeStageVolumeRequest) bool {
return false
}
+
+func isVolumeReadOnly(req *csi.NodeStageVolumeRequest) bool {
+ return isReadOnlyAccessMode(req.GetVolumeCapability().GetAccessMode().Mode)
+}
diff --git a/pkg/driver/volume.go b/pkg/driver/volume.go
index 8abf88b..023abf9 100644
--- a/pkg/driver/volume.go
+++ b/pkg/driver/volume.go
@@ -123,12 +123,7 @@ func (vol *Volume) Unstage(stagingTargetPath string) error {
// after a CSI driver restart. In this case, we need to force unmount.
glog.Infof("volume %s has no unmounter (rebuilt from existing mount), using force unmount", vol.VolumeId)
- // Try to unmount the staging path
- if err := mountutil.Unmount(stagingTargetPath); err != nil {
- glog.Warningf("error force unmounting volume %s: %v", vol.VolumeId, err)
- }
-
- // Clean up using mount utilities
+ // Clean up using mount utilities. This will also handle unmounting.
if err := mount.CleanupMountPoint(stagingTargetPath, mountutil, true); err != nil {
glog.Warningf("error cleaning up mount point for volume %s: %v", vol.VolumeId, err)
}