diff options
| author | Viktor Kuzmin <kvaster@gmail.com> | 2022-09-03 11:40:32 +0300 |
|---|---|---|
| committer | Viktor Kuzmin <kvaster@gmail.com> | 2022-09-03 11:40:32 +0300 |
| commit | bc49fc11ffe8c687c5d4ea46f25ead9191155ec0 (patch) | |
| tree | 901e66a4e89fb8d23648ca51c438f74c13bfcc35 /pkg/driver | |
| parent | 3e3d202acdc9cb523ae3f7e71b3e1b595f4d5450 (diff) | |
| download | seaweedfs-csi-driver-bc49fc11ffe8c687c5d4ea46f25ead9191155ec0.tar.xz seaweedfs-csi-driver-bc49fc11ffe8c687c5d4ea46f25ead9191155ec0.zip | |
Remove linux specific magic from fuse process start/stop.
Use pid from cmd.Process instead of /proc lookup
Use mount specific mutex
Log fuse mount process stderr and stdout for problems investigation
Diffstat (limited to 'pkg/driver')
| -rw-r--r-- | pkg/driver/mount_util.go | 60 | ||||
| -rw-r--r-- | pkg/driver/mounter.go | 91 | ||||
| -rw-r--r-- | pkg/driver/mounter_seaweedfs.go | 6 | ||||
| -rw-r--r-- | pkg/driver/nodeserver.go | 31 | ||||
| -rw-r--r-- | pkg/driver/utils.go | 35 | ||||
| -rw-r--r-- | pkg/driver/volume.go | 64 |
6 files changed, 131 insertions, 156 deletions
diff --git a/pkg/driver/mount_util.go b/pkg/driver/mount_util.go index af72840..561a499 100644 --- a/pkg/driver/mount_util.go +++ b/pkg/driver/mount_util.go @@ -2,43 +2,11 @@ package driver import ( "errors" - "fmt" - "io/ioutil" - "os" - "strings" - "syscall" "time" - "github.com/mitchellh/go-ps" - "github.com/seaweedfs/seaweedfs/weed/glog" "k8s.io/utils/mount" ) -func waitForProcess(p *os.Process, backoff int) error { - if backoff == 20 { - return fmt.Errorf("Timeout waiting for PID %v to end", p.Pid) - } - cmdLine, err := getCmdLine(p.Pid) - if err != nil { - glog.Warningf("Error checking cmdline of PID %v, assuming it is dead: %s", p.Pid, err) - return nil - } - if cmdLine == "" { - // ignore defunct processes - // TODO: debug why this happens in the first place - // seems to only happen on k8s, not on local docker - glog.Warning("Fuse process seems dead, returning") - return nil - } - if err := p.Signal(syscall.Signal(0)); err != nil { - glog.Warningf("Fuse process does not seem active or we are unprivileged: %s", err) - return nil - } - glog.Infof("Fuse process with PID %v still active, waiting...", p.Pid) - time.Sleep(time.Duration(backoff*100) * time.Millisecond) - return waitForProcess(p, backoff+1) -} - func waitForMount(path string, timeout time.Duration) error { var elapsed time.Duration var interval = 10 * time.Millisecond @@ -57,31 +25,3 @@ func waitForMount(path string, timeout time.Duration) error { } } } - -func findFuseMountProcess(path string) (*os.Process, error) { - processes, err := ps.Processes() - if err != nil { - return nil, err - } - for _, p := range processes { - cmdLine, err := getCmdLine(p.Pid()) - if err != nil { - glog.Errorf("Unable to get cmdline of PID %v: %s", p.Pid(), err) - continue - } - if strings.Contains(cmdLine, path) { - glog.Infof("Found matching pid %v on path %s", p.Pid(), path) - return os.FindProcess(p.Pid()) - } - } - return nil, nil -} - -func getCmdLine(pid int) (string, error) { - cmdLineFile := fmt.Sprintf("/proc/%v/cmdline", pid) - cmdLine, err := ioutil.ReadFile(cmdLineFile) - if err != nil { - return "", err - } - return string(cmdLine), nil -} diff --git a/pkg/driver/mounter.go b/pkg/driver/mounter.go index 53ac26a..38baa26 100644 --- a/pkg/driver/mounter.go +++ b/pkg/driver/mounter.go @@ -1,7 +1,10 @@ package driver import ( + "context" "fmt" + "os" + "syscall" "time" "os/exec" @@ -16,8 +19,19 @@ type Config struct { Filer string } +type Mount interface { + Unmount() error +} + type Mounter interface { - Mount(target string) error + Mount(target string) (Mount, error) +} + +type fuseMount struct { + path string + cmd *exec.Cmd + + finished chan struct{} } func newMounter(volumeID string, readOnly bool, driver *SeaweedFsDriver, volContext map[string]string) (Mounter, error) { @@ -34,52 +48,83 @@ func newMounter(volumeID string, readOnly bool, driver *SeaweedFsDriver, volCont return newSeaweedFsMounter(volumeID, path, collection, readOnly, driver, volContext) } -func fuseMount(path string, command string, args []string) error { +func newFuseMount(path string, command string, args []string) (Mount, error) { cmd := exec.Command(command, args...) glog.V(0).Infof("Mounting fuse with command: %s and args: %s", command, args) + // log fuse process messages - we need an easy way to investigate crashes in case it happens + cmd.Stderr = os.Stderr + cmd.Stdout = os.Stdout + err := cmd.Start() if err != nil { glog.Errorf("running weed mount: %v", err) - return fmt.Errorf("Error fuseMount command: %s\nargs: %s\nerror: %v", command, args, err) + return nil, fmt.Errorf("Error fuseMount command: %s\nargs: %s\nerror: %v", command, args, err) + } + + m := &fuseMount{ + path: path, + cmd: cmd, + + finished: make(chan struct{}), } // avoid zombie processes go func() { if err := cmd.Wait(); err != nil { - glog.Errorf("weed mount process %d exit: %v", cmd.Process.Pid, err) + glog.Errorf("weed mount exit, pid: %d, path: %v, error: %v", cmd.Process.Pid, path, err) + } else { + glog.Infof("weed mount exit, pid: %d, path: %v", cmd.Process.Pid, path) } + + close(m.finished) }() - return waitForMount(path, 10*time.Second) + if err = waitForMount(path, 10*time.Second); err != nil { + glog.Errorf("weed mount timeout, pid: %d, path: %v", cmd.Process.Pid, path) + + _ = m.finish(time.Second * 10) + return nil, err + } else { + return m, nil + } } -func fuseUnmount(path string) error { - m := mount.New("") +func (fm *fuseMount) finish(timeout time.Duration) error { + // ignore error, just inform we want process to exit + _ = fm.cmd.Process.Signal(syscall.Signal(1)) - if ok, err := m.IsLikelyNotMountPoint(path); !ok || mount.IsCorruptedMnt(err) { - if err := m.Unmount(path); err != nil { + if err := fm.waitFinished(timeout); err != nil { + glog.Errorf("weed mount terminate timeout, pid: %d, path: %v", fm.cmd.Process.Pid, fm.path) + _ = fm.cmd.Process.Kill() + if err = fm.waitFinished(time.Second * 1); err != nil { + glog.Errorf("weed mount kill timeout, pid: %d, path: %v", fm.cmd.Process.Pid, fm.path) return err } } - // as fuse quits immediately, we will try to wait until the process is done - process, err := findFuseMountProcess(path) - if err != nil { - glog.Errorf("Error getting PID of fuse mount: %s", err) - return nil - } - if process == nil { - glog.Warningf("Unable to find PID of fuse mount %s, it must have finished already", path) + return nil +} + +func (fm *fuseMount) waitFinished(timeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + select { + case <-ctx.Done(): + return context.DeadlineExceeded + case <-fm.finished: return nil } - glog.Infof("Found fuse pid %v of mount %s, checking if it still runs", process.Pid, path) - return waitForProcess(process, 1) } -func newConfigFromSecrets(secrets map[string]string) *Config { - t := &Config{ - Filer: secrets["filer"], +func (fm *fuseMount) Unmount() error { + m := mount.New("") + + if ok, err := m.IsLikelyNotMountPoint(fm.path); !ok || mount.IsCorruptedMnt(err) { + if err := m.Unmount(fm.path); err != nil { + return err + } } - return t + + return fm.finish(time.Second * 30) } diff --git a/pkg/driver/mounter_seaweedfs.go b/pkg/driver/mounter_seaweedfs.go index 641ad1f..c9620f8 100644 --- a/pkg/driver/mounter_seaweedfs.go +++ b/pkg/driver/mounter_seaweedfs.go @@ -34,7 +34,7 @@ func newSeaweedFsMounter(volumeID string, path string, collection string, readOn }, nil } -func (seaweedFs *seaweedFsMounter) Mount(target string) error { +func (seaweedFs *seaweedFsMounter) Mount(target string) (Mount, error) { glog.V(0).Infof("mounting %v %s to %s", seaweedFs.driver.filers, seaweedFs.path, target) var filers []string @@ -91,11 +91,11 @@ func (seaweedFs *seaweedFsMounter) Mount(target string) error { args = append(args, fmt.Sprintf("-map.gid=%s", seaweedFs.driver.GidMap)) } - err := fuseMount(target, seaweedFsCmd, args) + m, err := newFuseMount(target, seaweedFsCmd, args) if err != nil { glog.Errorf("mount %v %s to %s: %s", seaweedFs.driver.filers, seaweedFs.path, target, err) } - return err + return m, err } func GetLocalSocket(volumeID string) string { diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go index 38ba34f..3f18202 100644 --- a/pkg/driver/nodeserver.go +++ b/pkg/driver/nodeserver.go @@ -60,11 +60,16 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol mounter, err := newMounter(volumeID, readOnly, ns.Driver, volContext) if err != nil { + // node stage is unsuccessfull + ns.removeVolumeMutex(volumeID) return nil, err } volume := NewVolume(volumeID, mounter) if err := volume.Stage(stagingTargetPath); err != nil { + // node stage is unsuccessfull + ns.removeVolumeMutex(volumeID) + if os.IsPermission(err) { return nil, status.Error(codes.PermissionDenied, err.Error()) } @@ -83,6 +88,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { volumeID := req.GetVolumeId() targetPath := req.GetTargetPath() + stagingTargetPath := req.GetStagingTargetPath() glog.V(0).Infof("node publish volume %s to %s", volumeID, targetPath) @@ -99,6 +105,9 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis if targetPath == "" { return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } + if stagingTargetPath == "" { + return nil, status.Error(codes.InvalidArgument, "Staging target path missing in request") + } volumeMutex := ns.getVolumeMutex(volumeID) volumeMutex.Lock() @@ -111,7 +120,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis // When pod uses a volume in read-only mode, k8s will automatically // mount the volume as a read-only file system. - if err := volume.(*Volume).Publish(targetPath, req.GetReadonly()); err != nil { + if err := volume.(*Volume).Publish(stagingTargetPath, targetPath, req.GetReadonly()); err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -215,16 +224,16 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag // make sure there is no any garbage mounter := mount.New("") _ = mount.CleanupMountPoint(stagingTargetPath, mounter, true) - - return &csi.NodeUnstageVolumeResponse{}, nil - } - - if err := volume.(*Volume).Unstage(stagingTargetPath); err != nil { - return nil, status.Error(codes.Internal, err.Error()) } else { - ns.volumes.Delete(volumeID) + if err := volume.(*Volume).Unstage(stagingTargetPath); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } else { + ns.volumes.Delete(volumeID) + } } + // remove mutex on successfull unstage + ns.volumeMutexes.RemoveMutex(volumeID) return &csi.NodeUnstageVolumeResponse{}, nil } @@ -246,10 +255,14 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV return &csi.NodeExpandVolumeResponse{}, nil } -func (ns *NodeServer) getVolumeMutex(volumeID string) *sync.RWMutex { +func (ns *NodeServer) getVolumeMutex(volumeID string) *sync.Mutex { return ns.volumeMutexes.GetMutex(volumeID) } +func (ns *NodeServer) removeVolumeMutex(volumeID string) { + ns.volumeMutexes.RemoveMutex(volumeID) +} + func isVolumeReadOnly(req *csi.NodeStageVolumeRequest) bool { mode := req.GetVolumeCapability().GetAccessMode().Mode diff --git a/pkg/driver/utils.go b/pkg/driver/utils.go index c2e0c49..485d4da 100644 --- a/pkg/driver/utils.go +++ b/pkg/driver/utils.go @@ -8,17 +8,15 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/util" "golang.org/x/net/context" "google.golang.org/grpc" "k8s.io/utils/mount" ) func NewNodeServer(n *SeaweedFsDriver) *NodeServer { - return &NodeServer{ Driver: n, - volumeMutexes: NewKeyMutex(32), + volumeMutexes: NewKeyMutex(), } } @@ -65,13 +63,19 @@ func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, h } func checkMount(targetPath string) (bool, error) { - notMnt, err := mount.New("").IsLikelyNotMountPoint(targetPath) + mounter := mount.New("") + notMnt, err := mount.IsNotMountPoint(mounter, targetPath) if err != nil { if os.IsNotExist(err) { if err = os.MkdirAll(targetPath, 0750); err != nil { return false, err } notMnt = true + } else if mount.IsCorruptedMnt(err) { + if err := mounter.Unmount(targetPath); err != nil { + return false, err + } + notMnt, err = mount.IsNotMountPoint(mounter, targetPath) } else { return false, err } @@ -80,22 +84,19 @@ func checkMount(targetPath string) (bool, error) { } type KeyMutex struct { - mutexes []sync.RWMutex - size int32 + mutexes sync.Map } -func NewKeyMutex(size int32) *KeyMutex { - return &KeyMutex{ - mutexes: make([]sync.RWMutex, size), - size: size, - } +func NewKeyMutex() *KeyMutex { + return &KeyMutex{} } -func (km *KeyMutex) GetMutex(key string) *sync.RWMutex { - index := util.HashToInt32([]byte(key)) - if index < 0 { - index = -index - } +func (km *KeyMutex) GetMutex(key string) *sync.Mutex { + m, _ := km.mutexes.LoadOrStore(key, &sync.Mutex{}) + + return m.(*sync.Mutex) +} - return &km.mutexes[index%km.size] +func (km *KeyMutex) RemoveMutex(key string) { + km.mutexes.Delete(key) } diff --git a/pkg/driver/volume.go b/pkg/driver/volume.go index c7d8f64..29a2e8c 100644 --- a/pkg/driver/volume.go +++ b/pkg/driver/volume.go @@ -15,10 +15,8 @@ import ( type Volume struct { VolumeId string - // volume's real mount point - stagingTargetPath string - mounter Mounter + mount Mount // unix socket used to manage volume localSocket string @@ -26,33 +24,30 @@ type Volume struct { func NewVolume(volumeID string, mounter Mounter) *Volume { return &Volume{ - VolumeId: volumeID, - mounter: mounter, + VolumeId: volumeID, + mounter: mounter, + localSocket: GetLocalSocket(volumeID), } } func (vol *Volume) Stage(stagingTargetPath string) error { - if vol.isStaged() { - return nil - } - // check whether it can be mounted if notMnt, err := checkMount(stagingTargetPath); err != nil { return err } else if !notMnt { - // maybe already mounted? - return nil + // try to unmount before mounting again + _ = mount.New("").Unmount(stagingTargetPath) } - if err := vol.mounter.Mount(stagingTargetPath); err != nil { + if m, err := vol.mounter.Mount(stagingTargetPath); err == nil { + vol.mount = m + return nil + } else { return err } - - vol.stagingTargetPath = stagingTargetPath - return nil } -func (vol *Volume) Publish(targetPath string, readOnly bool) error { +func (vol *Volume) Publish(stagingTargetPath string, targetPath string, readOnly bool) error { // check whether it can be mounted if notMnt, err := checkMount(targetPath); err != nil { return err @@ -68,7 +63,7 @@ func (vol *Volume) Publish(targetPath string, readOnly bool) error { } mounter := mount.New("") - if err := mounter.Mount(vol.stagingTargetPath, targetPath, "", mountOptions); err != nil { + if err := mounter.Mount(stagingTargetPath, targetPath, "", mountOptions); err != nil { return err } @@ -76,11 +71,7 @@ func (vol *Volume) Publish(targetPath string, readOnly bool) error { } func (vol *Volume) Expand(sizeByte int64) error { - if !vol.isStaged() { - return nil - } - - target := fmt.Sprintf("passthrough:///unix://%s", vol.getLocalSocket()) + target := fmt.Sprintf("passthrough:///unix://%s", vol.localSocket) dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) clientConn, err := grpc.Dial(target, dialOption) @@ -106,36 +97,21 @@ func (vol *Volume) Unpublish(targetPath string) error { return nil } -func (vol *Volume) Unstage(_ string) error { - if !vol.isStaged() { +func (vol *Volume) Unstage(stagingTargetPath string) error { + glog.V(0).Infof("unmounting volume %s from %s", vol.VolumeId, stagingTargetPath) + + if vol.mount == nil { + glog.Errorf("volume is not mounted: %s, path", vol.VolumeId, stagingTargetPath) return nil } - mountPoint := vol.stagingTargetPath - glog.V(0).Infof("unmounting volume %s from %s", vol.VolumeId, mountPoint) - - if err := fuseUnmount(mountPoint); err != nil { + if err := vol.mount.Unmount(); err != nil { return err } - if err := os.Remove(mountPoint); err != nil && !os.IsNotExist(err) { + if err := os.Remove(stagingTargetPath); err != nil && !os.IsNotExist(err) { return err } return nil } - -func (vol *Volume) isStaged() bool { - return vol.stagingTargetPath != "" -} - -func (vol *Volume) getLocalSocket() string { - if vol.localSocket != "" { - return vol.localSocket - } - - socket := GetLocalSocket(vol.VolumeId) - - vol.localSocket = socket - return socket -} |
