From e76bd693e2022ac71f857548b0919155ebb04ca9 Mon Sep 17 00:00:00 2001 From: chrislusf Date: Wed, 3 Dec 2025 19:31:12 -0800 Subject: fix: add self-healing for volume mount failures after driver restart This addresses issue #203 - CSI Driver Self-Healing for Volume Mount Failures. Problem: When the CSI node driver restarts, the in-memory volume cache is lost. Kubelet then directly calls NodePublishVolume (skipping NodeStageVolume), which fails with 'volume hasn't been staged yet' error. Solution: 1. Added isStagingPathHealthy() to detect healthy vs stale/corrupted mounts 2. Added cleanupStaleStagingPath() to clean up stale mount points 3. Enhanced NodeStageVolume to clean up stale mounts before staging 4. Implemented self-healing in NodePublishVolume: - If staging path is healthy: rebuild volume cache from existing mount - If staging path is stale: clean up and re-stage automatically 5. Updated Volume.Unstage to handle rebuilt volumes without unmounter Benefits: - Automatic recovery after CSI driver restarts - No manual intervention required (no kubelet/pod restarts needed) - Handles both live and dead FUSE mount scenarios - Backward compatible with normal operations Fixes #203 --- pkg/driver/mount_util.go | 85 ++++++++++++++++++++++++++++++++++++++ pkg/driver/nodeserver.go | 104 ++++++++++++++++++++++++++++++++++++++++++++++- pkg/driver/volume.go | 23 ++++++++--- 3 files changed, 205 insertions(+), 7 deletions(-) diff --git a/pkg/driver/mount_util.go b/pkg/driver/mount_util.go index b62a9e7..049d4dc 100644 --- a/pkg/driver/mount_util.go +++ b/pkg/driver/mount_util.go @@ -2,13 +2,98 @@ package driver import ( "errors" + "os" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "k8s.io/mount-utils" ) var mountutil = mount.New("") +// isStagingPathHealthy checks if the staging path has a healthy FUSE mount. +// It returns true if the path is mounted and accessible, false otherwise. +func isStagingPathHealthy(stagingPath string) bool { + // Check if path exists + info, err := os.Stat(stagingPath) + if err != nil { + if os.IsNotExist(err) { + glog.V(4).Infof("staging path %s does not exist", stagingPath) + return false + } + // "Transport endpoint is not connected" or similar FUSE errors + if mount.IsCorruptedMnt(err) { + glog.Warningf("staging path %s has corrupted mount: %v", stagingPath, err) + return false + } + glog.V(4).Infof("staging path %s stat error: %v", stagingPath, err) + return false + } + + // Check if it's a directory + if !info.IsDir() { + glog.Warningf("staging path %s is not a directory", stagingPath) + return false + } + + // Check if it's a mount point + isMnt, err := mountutil.IsMountPoint(stagingPath) + if err != nil { + if mount.IsCorruptedMnt(err) { + glog.Warningf("staging path %s has corrupted mount point: %v", stagingPath, err) + return false + } + glog.V(4).Infof("staging path %s mount point check error: %v", stagingPath, err) + return false + } + + if !isMnt { + glog.V(4).Infof("staging path %s is not a mount point", stagingPath) + return false + } + + // Try to read the directory to verify FUSE is responsive + _, err = os.ReadDir(stagingPath) + if err != nil { + glog.Warningf("staging path %s is not readable (FUSE may be dead): %v", stagingPath, err) + return false + } + + glog.V(4).Infof("staging path %s is healthy", stagingPath) + return true +} + +// cleanupStaleStagingPath cleans up a stale or corrupted staging mount point. +// It attempts to unmount and remove the directory. +func cleanupStaleStagingPath(stagingPath string) error { + glog.Infof("cleaning up stale staging path %s", stagingPath) + + // Try to unmount first (handles corrupted mounts) + if err := mountutil.Unmount(stagingPath); err != nil { + glog.V(4).Infof("unmount staging path %s (may already be unmounted): %v", stagingPath, err) + } + + // Check if directory still exists and remove it + if _, err := os.Stat(stagingPath); err == nil { + if err := os.Remove(stagingPath); err != nil { + glog.Warningf("failed to remove staging path %s: %v", stagingPath, err) + return err + } + } else if !os.IsNotExist(err) { + // If stat fails with a different error (like corrupted mount), try force cleanup + if mount.IsCorruptedMnt(err) { + // Force unmount for corrupted mounts + if err := mount.CleanupMountPoint(stagingPath, mountutil, true); err != nil { + glog.Warningf("failed to cleanup corrupted mount point %s: %v", stagingPath, err) + return err + } + } + } + + glog.Infof("successfully cleaned up staging path %s", stagingPath) + return nil +} + func waitForMount(path string, timeout time.Duration) error { var elapsed time.Duration var interval = 10 * time.Millisecond diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go index eb81a0b..43d0c1b 100644 --- a/pkg/driver/nodeserver.go +++ b/pkg/driver/nodeserver.go @@ -51,12 +51,34 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol volumeMutex.Lock() defer volumeMutex.Unlock() - // The volume has been staged. + // The volume has been staged and is in memory cache. if _, ok := ns.volumes.Load(volumeID); ok { glog.Infof("volume %s has been already staged", volumeID) return &csi.NodeStageVolumeResponse{}, nil } + // Phase 2 Enhancement: Check if staging path exists but is stale/corrupted + // This handles cases where: + // 1. The CSI driver restarted and lost its in-memory state + // 2. The FUSE process died leaving a stale mount + if isStagingPathHealthy(stagingTargetPath) { + // The staging path is healthy - this means the FUSE mount is still active + // (possibly from before driver restart). We need to clean it up and re-stage + // because we don't have the unmounter reference to properly manage it. + glog.Infof("volume %s has existing healthy mount at %s, will re-stage to get proper control", volumeID, stagingTargetPath) + if err := cleanupStaleStagingPath(stagingTargetPath); err != nil { + glog.Warningf("failed to cleanup existing healthy staging path %s: %v, will try to stage anyway", stagingTargetPath, err) + } + } else { + // Check if there's a stale/corrupted mount that needs cleanup + if _, err := os.Stat(stagingTargetPath); err == nil || mount.IsCorruptedMnt(err) { + glog.Infof("volume %s has stale staging path at %s, cleaning up", volumeID, stagingTargetPath) + if err := cleanupStaleStagingPath(stagingTargetPath); err != nil { + glog.Warningf("failed to cleanup stale staging path %s: %v, will try to stage anyway", stagingTargetPath, err) + } + } + } + volContext := req.GetVolumeContext() readOnly := isVolumeReadOnly(req) @@ -128,7 +150,51 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis volume, ok := ns.volumes.Load(volumeID) if !ok { - return nil, status.Error(codes.FailedPrecondition, "volume hasn't been staged yet") + // Phase 1: Self-healing for missing volume cache + // This handles the case where the CSI driver restarted and lost its in-memory state, + // but kubelet thinks the volume is already staged and directly calls NodePublishVolume. + glog.Infof("volume %s not found in cache, attempting self-healing", volumeID) + + if isStagingPathHealthy(stagingTargetPath) { + // The staging path is healthy - rebuild volume cache from existing mount + glog.Infof("volume %s has healthy staging at %s, rebuilding cache", volumeID, stagingTargetPath) + volume = ns.rebuildVolumeFromStaging(volumeID, stagingTargetPath) + ns.volumes.Store(volumeID, volume) + } else { + // The staging path is not healthy - we need to re-stage the volume + // This requires volume context which we have from the request + glog.Infof("volume %s staging path %s is not healthy, re-staging", volumeID, stagingTargetPath) + + // Clean up stale staging path if it exists + if err := cleanupStaleStagingPath(stagingTargetPath); err != nil { + glog.Warningf("failed to cleanup stale staging path %s: %v", stagingTargetPath, err) + } + + // Re-stage the volume + volContext := req.GetVolumeContext() + readOnly := isPublishVolumeReadOnly(req) + + mounter, err := newMounter(volumeID, readOnly, ns.Driver, volContext) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to create mounter for re-staging: %v", err) + } + + newVolume := NewVolume(volumeID, mounter) + if err := newVolume.Stage(stagingTargetPath); err != nil { + return nil, status.Errorf(codes.Internal, "failed to re-stage volume: %v", err) + } + + // Apply quota if available + if capacity, err := k8s.GetVolumeCapacity(volumeID); err == nil { + if err := newVolume.Quota(capacity); err != nil { + glog.Warningf("failed to apply quota during re-staging: %v", err) + } + } + + ns.volumes.Store(volumeID, newVolume) + volume = newVolume + glog.Infof("volume %s successfully re-staged to %s", volumeID, stagingTargetPath) + } } // When pod uses a volume in read-only mode, k8s will automatically @@ -141,6 +207,40 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis return &csi.NodePublishVolumeResponse{}, nil } +// rebuildVolumeFromStaging creates a Volume struct from an existing healthy staging mount. +// This is used for self-healing when the CSI driver restarts but the FUSE mount is still active. +// Note: The returned Volume won't have an unmounter, so Unstage will need special handling. +func (ns *NodeServer) rebuildVolumeFromStaging(volumeID string, stagingPath string) *Volume { + return &Volume{ + VolumeId: volumeID, + StagedPath: stagingPath, + localSocket: GetLocalSocket(volumeID), + // mounter and unmounter are nil - this is intentional + // The FUSE process is already running, we just need to track the volume + } +} + +// isPublishVolumeReadOnly determines if a volume should be mounted read-only based on the publish request. +func isPublishVolumeReadOnly(req *csi.NodePublishVolumeRequest) bool { + if req.GetReadonly() { + return true + } + + mode := req.GetVolumeCapability().GetAccessMode().Mode + readOnlyModes := []csi.VolumeCapability_AccessMode_Mode{ + csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY, + csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY, + } + + for _, readOnlyMode := range readOnlyModes { + if mode == readOnlyMode { + return true + } + } + + return false +} + func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { volumeID := req.GetVolumeId() targetPath := req.GetTargetPath() diff --git a/pkg/driver/volume.go b/pkg/driver/volume.go index ab0dcd4..8abf88b 100644 --- a/pkg/driver/volume.go +++ b/pkg/driver/volume.go @@ -114,13 +114,26 @@ func (vol *Volume) Unpublish(targetPath string) error { func (vol *Volume) Unstage(stagingTargetPath string) error { glog.V(0).Infof("unmounting volume %s from %s", vol.VolumeId, stagingTargetPath) - if vol.unmounter == nil { - glog.Errorf("volume is not mounted: %s, path: %s", vol.VolumeId, stagingTargetPath) - return nil + if stagingTargetPath != vol.StagedPath && vol.StagedPath != "" { + glog.Warningf("staging path %s differs for volume %s at %s", stagingTargetPath, vol.VolumeId, vol.StagedPath) } - if stagingTargetPath != vol.StagedPath { - glog.Warningf("staging path %s differs for volume %s at %s", stagingTargetPath, vol.VolumeId, vol.StagedPath) + if vol.unmounter == nil { + // This can happen when the volume was rebuilt from an existing staging path + // after a CSI driver restart. In this case, we need to force unmount. + glog.Infof("volume %s has no unmounter (rebuilt from existing mount), using force unmount", vol.VolumeId) + + // Try to unmount the staging path + if err := mountutil.Unmount(stagingTargetPath); err != nil { + glog.Warningf("error force unmounting volume %s: %v", vol.VolumeId, err) + } + + // Clean up using mount utilities + if err := mount.CleanupMountPoint(stagingTargetPath, mountutil, true); err != nil { + glog.Warningf("error cleaning up mount point for volume %s: %v", vol.VolumeId, err) + } + + return nil } if err := vol.unmounter.Unmount(); err != nil { -- cgit v1.2.3