aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pkg/driver/mount_util.go85
-rw-r--r--pkg/driver/nodeserver.go104
-rw-r--r--pkg/driver/volume.go23
3 files changed, 205 insertions, 7 deletions
diff --git a/pkg/driver/mount_util.go b/pkg/driver/mount_util.go
index b62a9e7..049d4dc 100644
--- a/pkg/driver/mount_util.go
+++ b/pkg/driver/mount_util.go
@@ -2,13 +2,98 @@ package driver
import (
"errors"
+ "os"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"k8s.io/mount-utils"
)
var mountutil = mount.New("")
+// isStagingPathHealthy checks if the staging path has a healthy FUSE mount.
+// It returns true if the path is mounted and accessible, false otherwise.
+func isStagingPathHealthy(stagingPath string) bool {
+ // Check if path exists
+ info, err := os.Stat(stagingPath)
+ if err != nil {
+ if os.IsNotExist(err) {
+ glog.V(4).Infof("staging path %s does not exist", stagingPath)
+ return false
+ }
+ // "Transport endpoint is not connected" or similar FUSE errors
+ if mount.IsCorruptedMnt(err) {
+ glog.Warningf("staging path %s has corrupted mount: %v", stagingPath, err)
+ return false
+ }
+ glog.V(4).Infof("staging path %s stat error: %v", stagingPath, err)
+ return false
+ }
+
+ // Check if it's a directory
+ if !info.IsDir() {
+ glog.Warningf("staging path %s is not a directory", stagingPath)
+ return false
+ }
+
+ // Check if it's a mount point
+ isMnt, err := mountutil.IsMountPoint(stagingPath)
+ if err != nil {
+ if mount.IsCorruptedMnt(err) {
+ glog.Warningf("staging path %s has corrupted mount point: %v", stagingPath, err)
+ return false
+ }
+ glog.V(4).Infof("staging path %s mount point check error: %v", stagingPath, err)
+ return false
+ }
+
+ if !isMnt {
+ glog.V(4).Infof("staging path %s is not a mount point", stagingPath)
+ return false
+ }
+
+ // Try to read the directory to verify FUSE is responsive
+ _, err = os.ReadDir(stagingPath)
+ if err != nil {
+ glog.Warningf("staging path %s is not readable (FUSE may be dead): %v", stagingPath, err)
+ return false
+ }
+
+ glog.V(4).Infof("staging path %s is healthy", stagingPath)
+ return true
+}
+
+// cleanupStaleStagingPath cleans up a stale or corrupted staging mount point.
+// It attempts to unmount and remove the directory.
+func cleanupStaleStagingPath(stagingPath string) error {
+ glog.Infof("cleaning up stale staging path %s", stagingPath)
+
+ // Try to unmount first (handles corrupted mounts)
+ if err := mountutil.Unmount(stagingPath); err != nil {
+ glog.V(4).Infof("unmount staging path %s (may already be unmounted): %v", stagingPath, err)
+ }
+
+ // Check if directory still exists and remove it
+ if _, err := os.Stat(stagingPath); err == nil {
+ if err := os.Remove(stagingPath); err != nil {
+ glog.Warningf("failed to remove staging path %s: %v", stagingPath, err)
+ return err
+ }
+ } else if !os.IsNotExist(err) {
+ // If stat fails with a different error (like corrupted mount), try force cleanup
+ if mount.IsCorruptedMnt(err) {
+ // Force unmount for corrupted mounts
+ if err := mount.CleanupMountPoint(stagingPath, mountutil, true); err != nil {
+ glog.Warningf("failed to cleanup corrupted mount point %s: %v", stagingPath, err)
+ return err
+ }
+ }
+ }
+
+ glog.Infof("successfully cleaned up staging path %s", stagingPath)
+ return nil
+}
+
func waitForMount(path string, timeout time.Duration) error {
var elapsed time.Duration
var interval = 10 * time.Millisecond
diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go
index eb81a0b..43d0c1b 100644
--- a/pkg/driver/nodeserver.go
+++ b/pkg/driver/nodeserver.go
@@ -51,12 +51,34 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
volumeMutex.Lock()
defer volumeMutex.Unlock()
- // The volume has been staged.
+ // The volume has been staged and is in memory cache.
if _, ok := ns.volumes.Load(volumeID); ok {
glog.Infof("volume %s has been already staged", volumeID)
return &csi.NodeStageVolumeResponse{}, nil
}
+ // Phase 2 Enhancement: Check if staging path exists but is stale/corrupted
+ // This handles cases where:
+ // 1. The CSI driver restarted and lost its in-memory state
+ // 2. The FUSE process died leaving a stale mount
+ if isStagingPathHealthy(stagingTargetPath) {
+ // The staging path is healthy - this means the FUSE mount is still active
+ // (possibly from before driver restart). We need to clean it up and re-stage
+ // because we don't have the unmounter reference to properly manage it.
+ glog.Infof("volume %s has existing healthy mount at %s, will re-stage to get proper control", volumeID, stagingTargetPath)
+ if err := cleanupStaleStagingPath(stagingTargetPath); err != nil {
+ glog.Warningf("failed to cleanup existing healthy staging path %s: %v, will try to stage anyway", stagingTargetPath, err)
+ }
+ } else {
+ // Check if there's a stale/corrupted mount that needs cleanup
+ if _, err := os.Stat(stagingTargetPath); err == nil || mount.IsCorruptedMnt(err) {
+ glog.Infof("volume %s has stale staging path at %s, cleaning up", volumeID, stagingTargetPath)
+ if err := cleanupStaleStagingPath(stagingTargetPath); err != nil {
+ glog.Warningf("failed to cleanup stale staging path %s: %v, will try to stage anyway", stagingTargetPath, err)
+ }
+ }
+ }
+
volContext := req.GetVolumeContext()
readOnly := isVolumeReadOnly(req)
@@ -128,7 +150,51 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
volume, ok := ns.volumes.Load(volumeID)
if !ok {
- return nil, status.Error(codes.FailedPrecondition, "volume hasn't been staged yet")
+ // Phase 1: Self-healing for missing volume cache
+ // This handles the case where the CSI driver restarted and lost its in-memory state,
+ // but kubelet thinks the volume is already staged and directly calls NodePublishVolume.
+ glog.Infof("volume %s not found in cache, attempting self-healing", volumeID)
+
+ if isStagingPathHealthy(stagingTargetPath) {
+ // The staging path is healthy - rebuild volume cache from existing mount
+ glog.Infof("volume %s has healthy staging at %s, rebuilding cache", volumeID, stagingTargetPath)
+ volume = ns.rebuildVolumeFromStaging(volumeID, stagingTargetPath)
+ ns.volumes.Store(volumeID, volume)
+ } else {
+ // The staging path is not healthy - we need to re-stage the volume
+ // This requires volume context which we have from the request
+ glog.Infof("volume %s staging path %s is not healthy, re-staging", volumeID, stagingTargetPath)
+
+ // Clean up stale staging path if it exists
+ if err := cleanupStaleStagingPath(stagingTargetPath); err != nil {
+ glog.Warningf("failed to cleanup stale staging path %s: %v", stagingTargetPath, err)
+ }
+
+ // Re-stage the volume
+ volContext := req.GetVolumeContext()
+ readOnly := isPublishVolumeReadOnly(req)
+
+ mounter, err := newMounter(volumeID, readOnly, ns.Driver, volContext)
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "failed to create mounter for re-staging: %v", err)
+ }
+
+ newVolume := NewVolume(volumeID, mounter)
+ if err := newVolume.Stage(stagingTargetPath); err != nil {
+ return nil, status.Errorf(codes.Internal, "failed to re-stage volume: %v", err)
+ }
+
+ // Apply quota if available
+ if capacity, err := k8s.GetVolumeCapacity(volumeID); err == nil {
+ if err := newVolume.Quota(capacity); err != nil {
+ glog.Warningf("failed to apply quota during re-staging: %v", err)
+ }
+ }
+
+ ns.volumes.Store(volumeID, newVolume)
+ volume = newVolume
+ glog.Infof("volume %s successfully re-staged to %s", volumeID, stagingTargetPath)
+ }
}
// When pod uses a volume in read-only mode, k8s will automatically
@@ -141,6 +207,40 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return &csi.NodePublishVolumeResponse{}, nil
}
+// rebuildVolumeFromStaging creates a Volume struct from an existing healthy staging mount.
+// This is used for self-healing when the CSI driver restarts but the FUSE mount is still active.
+// Note: The returned Volume won't have an unmounter, so Unstage will need special handling.
+func (ns *NodeServer) rebuildVolumeFromStaging(volumeID string, stagingPath string) *Volume {
+ return &Volume{
+ VolumeId: volumeID,
+ StagedPath: stagingPath,
+ localSocket: GetLocalSocket(volumeID),
+ // mounter and unmounter are nil - this is intentional
+ // The FUSE process is already running, we just need to track the volume
+ }
+}
+
+// isPublishVolumeReadOnly determines if a volume should be mounted read-only based on the publish request.
+func isPublishVolumeReadOnly(req *csi.NodePublishVolumeRequest) bool {
+ if req.GetReadonly() {
+ return true
+ }
+
+ mode := req.GetVolumeCapability().GetAccessMode().Mode
+ readOnlyModes := []csi.VolumeCapability_AccessMode_Mode{
+ csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY,
+ csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY,
+ }
+
+ for _, readOnlyMode := range readOnlyModes {
+ if mode == readOnlyMode {
+ return true
+ }
+ }
+
+ return false
+}
+
func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
volumeID := req.GetVolumeId()
targetPath := req.GetTargetPath()
diff --git a/pkg/driver/volume.go b/pkg/driver/volume.go
index ab0dcd4..8abf88b 100644
--- a/pkg/driver/volume.go
+++ b/pkg/driver/volume.go
@@ -114,13 +114,26 @@ func (vol *Volume) Unpublish(targetPath string) error {
func (vol *Volume) Unstage(stagingTargetPath string) error {
glog.V(0).Infof("unmounting volume %s from %s", vol.VolumeId, stagingTargetPath)
- if vol.unmounter == nil {
- glog.Errorf("volume is not mounted: %s, path: %s", vol.VolumeId, stagingTargetPath)
- return nil
+ if stagingTargetPath != vol.StagedPath && vol.StagedPath != "" {
+ glog.Warningf("staging path %s differs for volume %s at %s", stagingTargetPath, vol.VolumeId, vol.StagedPath)
}
- if stagingTargetPath != vol.StagedPath {
- glog.Warningf("staging path %s differs for volume %s at %s", stagingTargetPath, vol.VolumeId, vol.StagedPath)
+ if vol.unmounter == nil {
+ // This can happen when the volume was rebuilt from an existing staging path
+ // after a CSI driver restart. In this case, we need to force unmount.
+ glog.Infof("volume %s has no unmounter (rebuilt from existing mount), using force unmount", vol.VolumeId)
+
+ // Try to unmount the staging path
+ if err := mountutil.Unmount(stagingTargetPath); err != nil {
+ glog.Warningf("error force unmounting volume %s: %v", vol.VolumeId, err)
+ }
+
+ // Clean up using mount utilities
+ if err := mount.CleanupMountPoint(stagingTargetPath, mountutil, true); err != nil {
+ glog.Warningf("error cleaning up mount point for volume %s: %v", vol.VolumeId, err)
+ }
+
+ return nil
}
if err := vol.unmounter.Unmount(); err != nil {