diff options
| author | chrislusf <chris.lu@gmail.com> | 2025-12-03 19:36:35 -0800 |
|---|---|---|
| committer | Chris Lu <chrislusf@users.noreply.github.com> | 2025-12-03 20:52:27 -0800 |
| commit | 05ac88f67723f1a6ce60543413f3ae59d2e85654 (patch) | |
| tree | 2030b7f8be258b90c07da00cb4a9a1e593abe966 | |
| parent | e76bd693e2022ac71f857548b0919155ebb04ca9 (diff) | |
| download | seaweedfs-csi-driver-05ac88f67723f1a6ce60543413f3ae59d2e85654.tar.xz seaweedfs-csi-driver-05ac88f67723f1a6ce60543413f3ae59d2e85654.zip | |
refactor: address code review feedback
- Handle unexpected stat errors in cleanupStaleStagingPath (high priority)
- Extract staging logic into stageNewVolume helper method for reuse
- Extract isReadOnlyAccessMode helper to avoid duplicated read-only checks
- Remove redundant mountutil.Unmount call (CleanupMountPoint already handles it)
| -rw-r--r-- | pkg/driver/mount_util.go | 10 | ||||
| -rw-r--r-- | pkg/driver/nodeserver.go | 85 | ||||
| -rw-r--r-- | pkg/driver/volume.go | 7 |
3 files changed, 43 insertions, 59 deletions
diff --git a/pkg/driver/mount_util.go b/pkg/driver/mount_util.go index 049d4dc..200817d 100644 --- a/pkg/driver/mount_util.go +++ b/pkg/driver/mount_util.go @@ -83,10 +83,14 @@ func cleanupStaleStagingPath(stagingPath string) error { // If stat fails with a different error (like corrupted mount), try force cleanup if mount.IsCorruptedMnt(err) { // Force unmount for corrupted mounts - if err := mount.CleanupMountPoint(stagingPath, mountutil, true); err != nil { - glog.Warningf("failed to cleanup corrupted mount point %s: %v", stagingPath, err) - return err + if cleanupErr := mount.CleanupMountPoint(stagingPath, mountutil, true); cleanupErr != nil { + glog.Warningf("failed to cleanup corrupted mount point %s: %v", stagingPath, cleanupErr) + return cleanupErr } + } else { + // stat failed with an unexpected error, return it + glog.Warningf("stat on staging path %s failed during cleanup: %v", stagingPath, err) + return err } } diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go index 43d0c1b..b9059fe 100644 --- a/pkg/driver/nodeserver.go +++ b/pkg/driver/nodeserver.go @@ -82,16 +82,9 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol volContext := req.GetVolumeContext() readOnly := isVolumeReadOnly(req) - mounter, err := newMounter(volumeID, readOnly, ns.Driver, volContext) + volume, err := ns.stageNewVolume(volumeID, stagingTargetPath, volContext, readOnly) if err != nil { - // node stage is unsuccessfull - ns.removeVolumeMutex(volumeID) - return nil, err - } - - volume := NewVolume(volumeID, mounter) - if err := volume.Stage(stagingTargetPath); err != nil { - // node stage is unsuccessfull + // node stage is unsuccessful ns.removeVolumeMutex(volumeID) if os.IsPermission(err) { @@ -103,17 +96,6 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol return nil, status.Error(codes.Internal, err.Error()) } - // In seaweedfs quota is not configured on seaweedfs servers. - // Quota is applied only per mount. - // Previously we used to cmdline parameter to apply it, but such way does not allow dynamic resizing. - if capacity, err := k8s.GetVolumeCapacity(volumeID); err == nil { - if err := volume.Quota(capacity); err != nil { - return nil, err - } - } else { - glog.Infof("orchestration system is not compatible with the k8s api, error is: %s", err) - } - ns.volumes.Store(volumeID, volume) glog.Infof("volume %s successfully staged to %s", volumeID, stagingTargetPath) @@ -170,27 +152,15 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis glog.Warningf("failed to cleanup stale staging path %s: %v", stagingTargetPath, err) } - // Re-stage the volume + // Re-stage the volume using the shared helper volContext := req.GetVolumeContext() readOnly := isPublishVolumeReadOnly(req) - mounter, err := newMounter(volumeID, readOnly, ns.Driver, volContext) + newVolume, err := ns.stageNewVolume(volumeID, stagingTargetPath, volContext, readOnly) if err != nil { - return nil, status.Errorf(codes.Internal, "failed to create mounter for re-staging: %v", err) - } - - newVolume := NewVolume(volumeID, mounter) - if err := newVolume.Stage(stagingTargetPath); err != nil { return nil, status.Errorf(codes.Internal, "failed to re-stage volume: %v", err) } - // Apply quota if available - if capacity, err := k8s.GetVolumeCapacity(volumeID); err == nil { - if err := newVolume.Quota(capacity); err != nil { - glog.Warningf("failed to apply quota during re-staging: %v", err) - } - } - ns.volumes.Store(volumeID, newVolume) volume = newVolume glog.Infof("volume %s successfully re-staged to %s", volumeID, stagingTargetPath) @@ -225,20 +195,7 @@ func isPublishVolumeReadOnly(req *csi.NodePublishVolumeRequest) bool { if req.GetReadonly() { return true } - - mode := req.GetVolumeCapability().GetAccessMode().Mode - readOnlyModes := []csi.VolumeCapability_AccessMode_Mode{ - csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY, - csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY, - } - - for _, readOnlyMode := range readOnlyModes { - if mode == readOnlyMode { - return true - } - } - - return false + return isReadOnlyAccessMode(req.GetVolumeCapability().GetAccessMode().Mode) } func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { @@ -401,9 +358,33 @@ func (ns *NodeServer) removeVolumeMutex(volumeID string) { ns.volumeMutexes.RemoveMutex(volumeID) } -func isVolumeReadOnly(req *csi.NodeStageVolumeRequest) bool { - mode := req.GetVolumeCapability().GetAccessMode().Mode +// stageNewVolume creates and stages a new volume with the given parameters. +// This is a helper method used by both NodeStageVolume and NodePublishVolume (for re-staging). +func (ns *NodeServer) stageNewVolume(volumeID, stagingTargetPath string, volContext map[string]string, readOnly bool) (*Volume, error) { + mounter, err := newMounter(volumeID, readOnly, ns.Driver, volContext) + if err != nil { + return nil, err + } + + volume := NewVolume(volumeID, mounter) + if err := volume.Stage(stagingTargetPath); err != nil { + return nil, err + } + // Apply quota if available + if capacity, err := k8s.GetVolumeCapacity(volumeID); err == nil { + if err := volume.Quota(capacity); err != nil { + glog.Warningf("failed to apply quota for volume %s: %v", volumeID, err) + } + } else { + glog.V(4).Infof("orchestration system is not compatible with the k8s api, error is: %s", err) + } + + return volume, nil +} + +// isReadOnlyAccessMode checks if the given access mode is read-only. +func isReadOnlyAccessMode(mode csi.VolumeCapability_AccessMode_Mode) bool { readOnlyModes := []csi.VolumeCapability_AccessMode_Mode{ csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY, csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY, @@ -417,3 +398,7 @@ func isVolumeReadOnly(req *csi.NodeStageVolumeRequest) bool { return false } + +func isVolumeReadOnly(req *csi.NodeStageVolumeRequest) bool { + return isReadOnlyAccessMode(req.GetVolumeCapability().GetAccessMode().Mode) +} diff --git a/pkg/driver/volume.go b/pkg/driver/volume.go index 8abf88b..023abf9 100644 --- a/pkg/driver/volume.go +++ b/pkg/driver/volume.go @@ -123,12 +123,7 @@ func (vol *Volume) Unstage(stagingTargetPath string) error { // after a CSI driver restart. In this case, we need to force unmount. glog.Infof("volume %s has no unmounter (rebuilt from existing mount), using force unmount", vol.VolumeId) - // Try to unmount the staging path - if err := mountutil.Unmount(stagingTargetPath); err != nil { - glog.Warningf("error force unmounting volume %s: %v", vol.VolumeId, err) - } - - // Clean up using mount utilities + // Clean up using mount utilities. This will also handle unmounting. if err := mount.CleanupMountPoint(stagingTargetPath, mountutil, true); err != nil { glog.Warningf("error cleaning up mount point for volume %s: %v", vol.VolumeId, err) } |
