aboutsummaryrefslogtreecommitdiff
path: root/pkg/driver
diff options
context:
space:
mode:
authorViktor Kuzmin <kvaster@gmail.com>2022-09-03 11:40:32 +0300
committerViktor Kuzmin <kvaster@gmail.com>2022-09-03 11:40:32 +0300
commitbc49fc11ffe8c687c5d4ea46f25ead9191155ec0 (patch)
tree901e66a4e89fb8d23648ca51c438f74c13bfcc35 /pkg/driver
parent3e3d202acdc9cb523ae3f7e71b3e1b595f4d5450 (diff)
downloadseaweedfs-csi-driver-bc49fc11ffe8c687c5d4ea46f25ead9191155ec0.tar.xz
seaweedfs-csi-driver-bc49fc11ffe8c687c5d4ea46f25ead9191155ec0.zip
Remove linux specific magic from fuse process start/stop.
Use pid from cmd.Process instead of /proc lookup Use mount specific mutex Log fuse mount process stderr and stdout for problems investigation
Diffstat (limited to 'pkg/driver')
-rw-r--r--pkg/driver/mount_util.go60
-rw-r--r--pkg/driver/mounter.go91
-rw-r--r--pkg/driver/mounter_seaweedfs.go6
-rw-r--r--pkg/driver/nodeserver.go31
-rw-r--r--pkg/driver/utils.go35
-rw-r--r--pkg/driver/volume.go64
6 files changed, 131 insertions, 156 deletions
diff --git a/pkg/driver/mount_util.go b/pkg/driver/mount_util.go
index af72840..561a499 100644
--- a/pkg/driver/mount_util.go
+++ b/pkg/driver/mount_util.go
@@ -2,43 +2,11 @@ package driver
import (
"errors"
- "fmt"
- "io/ioutil"
- "os"
- "strings"
- "syscall"
"time"
- "github.com/mitchellh/go-ps"
- "github.com/seaweedfs/seaweedfs/weed/glog"
"k8s.io/utils/mount"
)
-func waitForProcess(p *os.Process, backoff int) error {
- if backoff == 20 {
- return fmt.Errorf("Timeout waiting for PID %v to end", p.Pid)
- }
- cmdLine, err := getCmdLine(p.Pid)
- if err != nil {
- glog.Warningf("Error checking cmdline of PID %v, assuming it is dead: %s", p.Pid, err)
- return nil
- }
- if cmdLine == "" {
- // ignore defunct processes
- // TODO: debug why this happens in the first place
- // seems to only happen on k8s, not on local docker
- glog.Warning("Fuse process seems dead, returning")
- return nil
- }
- if err := p.Signal(syscall.Signal(0)); err != nil {
- glog.Warningf("Fuse process does not seem active or we are unprivileged: %s", err)
- return nil
- }
- glog.Infof("Fuse process with PID %v still active, waiting...", p.Pid)
- time.Sleep(time.Duration(backoff*100) * time.Millisecond)
- return waitForProcess(p, backoff+1)
-}
-
func waitForMount(path string, timeout time.Duration) error {
var elapsed time.Duration
var interval = 10 * time.Millisecond
@@ -57,31 +25,3 @@ func waitForMount(path string, timeout time.Duration) error {
}
}
}
-
-func findFuseMountProcess(path string) (*os.Process, error) {
- processes, err := ps.Processes()
- if err != nil {
- return nil, err
- }
- for _, p := range processes {
- cmdLine, err := getCmdLine(p.Pid())
- if err != nil {
- glog.Errorf("Unable to get cmdline of PID %v: %s", p.Pid(), err)
- continue
- }
- if strings.Contains(cmdLine, path) {
- glog.Infof("Found matching pid %v on path %s", p.Pid(), path)
- return os.FindProcess(p.Pid())
- }
- }
- return nil, nil
-}
-
-func getCmdLine(pid int) (string, error) {
- cmdLineFile := fmt.Sprintf("/proc/%v/cmdline", pid)
- cmdLine, err := ioutil.ReadFile(cmdLineFile)
- if err != nil {
- return "", err
- }
- return string(cmdLine), nil
-}
diff --git a/pkg/driver/mounter.go b/pkg/driver/mounter.go
index 53ac26a..38baa26 100644
--- a/pkg/driver/mounter.go
+++ b/pkg/driver/mounter.go
@@ -1,7 +1,10 @@
package driver
import (
+ "context"
"fmt"
+ "os"
+ "syscall"
"time"
"os/exec"
@@ -16,8 +19,19 @@ type Config struct {
Filer string
}
+type Mount interface {
+ Unmount() error
+}
+
type Mounter interface {
- Mount(target string) error
+ Mount(target string) (Mount, error)
+}
+
+type fuseMount struct {
+ path string
+ cmd *exec.Cmd
+
+ finished chan struct{}
}
func newMounter(volumeID string, readOnly bool, driver *SeaweedFsDriver, volContext map[string]string) (Mounter, error) {
@@ -34,52 +48,83 @@ func newMounter(volumeID string, readOnly bool, driver *SeaweedFsDriver, volCont
return newSeaweedFsMounter(volumeID, path, collection, readOnly, driver, volContext)
}
-func fuseMount(path string, command string, args []string) error {
+func newFuseMount(path string, command string, args []string) (Mount, error) {
cmd := exec.Command(command, args...)
glog.V(0).Infof("Mounting fuse with command: %s and args: %s", command, args)
+ // log fuse process messages - we need an easy way to investigate crashes in case it happens
+ cmd.Stderr = os.Stderr
+ cmd.Stdout = os.Stdout
+
err := cmd.Start()
if err != nil {
glog.Errorf("running weed mount: %v", err)
- return fmt.Errorf("Error fuseMount command: %s\nargs: %s\nerror: %v", command, args, err)
+ return nil, fmt.Errorf("Error fuseMount command: %s\nargs: %s\nerror: %v", command, args, err)
+ }
+
+ m := &fuseMount{
+ path: path,
+ cmd: cmd,
+
+ finished: make(chan struct{}),
}
// avoid zombie processes
go func() {
if err := cmd.Wait(); err != nil {
- glog.Errorf("weed mount process %d exit: %v", cmd.Process.Pid, err)
+ glog.Errorf("weed mount exit, pid: %d, path: %v, error: %v", cmd.Process.Pid, path, err)
+ } else {
+ glog.Infof("weed mount exit, pid: %d, path: %v", cmd.Process.Pid, path)
}
+
+ close(m.finished)
}()
- return waitForMount(path, 10*time.Second)
+ if err = waitForMount(path, 10*time.Second); err != nil {
+ glog.Errorf("weed mount timeout, pid: %d, path: %v", cmd.Process.Pid, path)
+
+ _ = m.finish(time.Second * 10)
+ return nil, err
+ } else {
+ return m, nil
+ }
}
-func fuseUnmount(path string) error {
- m := mount.New("")
+func (fm *fuseMount) finish(timeout time.Duration) error {
+ // ignore error, just inform we want process to exit
+ _ = fm.cmd.Process.Signal(syscall.Signal(1))
- if ok, err := m.IsLikelyNotMountPoint(path); !ok || mount.IsCorruptedMnt(err) {
- if err := m.Unmount(path); err != nil {
+ if err := fm.waitFinished(timeout); err != nil {
+ glog.Errorf("weed mount terminate timeout, pid: %d, path: %v", fm.cmd.Process.Pid, fm.path)
+ _ = fm.cmd.Process.Kill()
+ if err = fm.waitFinished(time.Second * 1); err != nil {
+ glog.Errorf("weed mount kill timeout, pid: %d, path: %v", fm.cmd.Process.Pid, fm.path)
return err
}
}
- // as fuse quits immediately, we will try to wait until the process is done
- process, err := findFuseMountProcess(path)
- if err != nil {
- glog.Errorf("Error getting PID of fuse mount: %s", err)
- return nil
- }
- if process == nil {
- glog.Warningf("Unable to find PID of fuse mount %s, it must have finished already", path)
+ return nil
+}
+
+func (fm *fuseMount) waitFinished(timeout time.Duration) error {
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+ select {
+ case <-ctx.Done():
+ return context.DeadlineExceeded
+ case <-fm.finished:
return nil
}
- glog.Infof("Found fuse pid %v of mount %s, checking if it still runs", process.Pid, path)
- return waitForProcess(process, 1)
}
-func newConfigFromSecrets(secrets map[string]string) *Config {
- t := &Config{
- Filer: secrets["filer"],
+func (fm *fuseMount) Unmount() error {
+ m := mount.New("")
+
+ if ok, err := m.IsLikelyNotMountPoint(fm.path); !ok || mount.IsCorruptedMnt(err) {
+ if err := m.Unmount(fm.path); err != nil {
+ return err
+ }
}
- return t
+
+ return fm.finish(time.Second * 30)
}
diff --git a/pkg/driver/mounter_seaweedfs.go b/pkg/driver/mounter_seaweedfs.go
index 641ad1f..c9620f8 100644
--- a/pkg/driver/mounter_seaweedfs.go
+++ b/pkg/driver/mounter_seaweedfs.go
@@ -34,7 +34,7 @@ func newSeaweedFsMounter(volumeID string, path string, collection string, readOn
}, nil
}
-func (seaweedFs *seaweedFsMounter) Mount(target string) error {
+func (seaweedFs *seaweedFsMounter) Mount(target string) (Mount, error) {
glog.V(0).Infof("mounting %v %s to %s", seaweedFs.driver.filers, seaweedFs.path, target)
var filers []string
@@ -91,11 +91,11 @@ func (seaweedFs *seaweedFsMounter) Mount(target string) error {
args = append(args, fmt.Sprintf("-map.gid=%s", seaweedFs.driver.GidMap))
}
- err := fuseMount(target, seaweedFsCmd, args)
+ m, err := newFuseMount(target, seaweedFsCmd, args)
if err != nil {
glog.Errorf("mount %v %s to %s: %s", seaweedFs.driver.filers, seaweedFs.path, target, err)
}
- return err
+ return m, err
}
func GetLocalSocket(volumeID string) string {
diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go
index 38ba34f..3f18202 100644
--- a/pkg/driver/nodeserver.go
+++ b/pkg/driver/nodeserver.go
@@ -60,11 +60,16 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
mounter, err := newMounter(volumeID, readOnly, ns.Driver, volContext)
if err != nil {
+ // node stage is unsuccessfull
+ ns.removeVolumeMutex(volumeID)
return nil, err
}
volume := NewVolume(volumeID, mounter)
if err := volume.Stage(stagingTargetPath); err != nil {
+ // node stage is unsuccessfull
+ ns.removeVolumeMutex(volumeID)
+
if os.IsPermission(err) {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
@@ -83,6 +88,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
volumeID := req.GetVolumeId()
targetPath := req.GetTargetPath()
+ stagingTargetPath := req.GetStagingTargetPath()
glog.V(0).Infof("node publish volume %s to %s", volumeID, targetPath)
@@ -99,6 +105,9 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
if targetPath == "" {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
+ if stagingTargetPath == "" {
+ return nil, status.Error(codes.InvalidArgument, "Staging target path missing in request")
+ }
volumeMutex := ns.getVolumeMutex(volumeID)
volumeMutex.Lock()
@@ -111,7 +120,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
// When pod uses a volume in read-only mode, k8s will automatically
// mount the volume as a read-only file system.
- if err := volume.(*Volume).Publish(targetPath, req.GetReadonly()); err != nil {
+ if err := volume.(*Volume).Publish(stagingTargetPath, targetPath, req.GetReadonly()); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
@@ -215,16 +224,16 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
// make sure there is no any garbage
mounter := mount.New("")
_ = mount.CleanupMountPoint(stagingTargetPath, mounter, true)
-
- return &csi.NodeUnstageVolumeResponse{}, nil
- }
-
- if err := volume.(*Volume).Unstage(stagingTargetPath); err != nil {
- return nil, status.Error(codes.Internal, err.Error())
} else {
- ns.volumes.Delete(volumeID)
+ if err := volume.(*Volume).Unstage(stagingTargetPath); err != nil {
+ return nil, status.Error(codes.Internal, err.Error())
+ } else {
+ ns.volumes.Delete(volumeID)
+ }
}
+ // remove mutex on successfull unstage
+ ns.volumeMutexes.RemoveMutex(volumeID)
return &csi.NodeUnstageVolumeResponse{}, nil
}
@@ -246,10 +255,14 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
return &csi.NodeExpandVolumeResponse{}, nil
}
-func (ns *NodeServer) getVolumeMutex(volumeID string) *sync.RWMutex {
+func (ns *NodeServer) getVolumeMutex(volumeID string) *sync.Mutex {
return ns.volumeMutexes.GetMutex(volumeID)
}
+func (ns *NodeServer) removeVolumeMutex(volumeID string) {
+ ns.volumeMutexes.RemoveMutex(volumeID)
+}
+
func isVolumeReadOnly(req *csi.NodeStageVolumeRequest) bool {
mode := req.GetVolumeCapability().GetAccessMode().Mode
diff --git a/pkg/driver/utils.go b/pkg/driver/utils.go
index c2e0c49..485d4da 100644
--- a/pkg/driver/utils.go
+++ b/pkg/driver/utils.go
@@ -8,17 +8,15 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/util"
"golang.org/x/net/context"
"google.golang.org/grpc"
"k8s.io/utils/mount"
)
func NewNodeServer(n *SeaweedFsDriver) *NodeServer {
-
return &NodeServer{
Driver: n,
- volumeMutexes: NewKeyMutex(32),
+ volumeMutexes: NewKeyMutex(),
}
}
@@ -65,13 +63,19 @@ func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, h
}
func checkMount(targetPath string) (bool, error) {
- notMnt, err := mount.New("").IsLikelyNotMountPoint(targetPath)
+ mounter := mount.New("")
+ notMnt, err := mount.IsNotMountPoint(mounter, targetPath)
if err != nil {
if os.IsNotExist(err) {
if err = os.MkdirAll(targetPath, 0750); err != nil {
return false, err
}
notMnt = true
+ } else if mount.IsCorruptedMnt(err) {
+ if err := mounter.Unmount(targetPath); err != nil {
+ return false, err
+ }
+ notMnt, err = mount.IsNotMountPoint(mounter, targetPath)
} else {
return false, err
}
@@ -80,22 +84,19 @@ func checkMount(targetPath string) (bool, error) {
}
type KeyMutex struct {
- mutexes []sync.RWMutex
- size int32
+ mutexes sync.Map
}
-func NewKeyMutex(size int32) *KeyMutex {
- return &KeyMutex{
- mutexes: make([]sync.RWMutex, size),
- size: size,
- }
+func NewKeyMutex() *KeyMutex {
+ return &KeyMutex{}
}
-func (km *KeyMutex) GetMutex(key string) *sync.RWMutex {
- index := util.HashToInt32([]byte(key))
- if index < 0 {
- index = -index
- }
+func (km *KeyMutex) GetMutex(key string) *sync.Mutex {
+ m, _ := km.mutexes.LoadOrStore(key, &sync.Mutex{})
+
+ return m.(*sync.Mutex)
+}
- return &km.mutexes[index%km.size]
+func (km *KeyMutex) RemoveMutex(key string) {
+ km.mutexes.Delete(key)
}
diff --git a/pkg/driver/volume.go b/pkg/driver/volume.go
index c7d8f64..29a2e8c 100644
--- a/pkg/driver/volume.go
+++ b/pkg/driver/volume.go
@@ -15,10 +15,8 @@ import (
type Volume struct {
VolumeId string
- // volume's real mount point
- stagingTargetPath string
-
mounter Mounter
+ mount Mount
// unix socket used to manage volume
localSocket string
@@ -26,33 +24,30 @@ type Volume struct {
func NewVolume(volumeID string, mounter Mounter) *Volume {
return &Volume{
- VolumeId: volumeID,
- mounter: mounter,
+ VolumeId: volumeID,
+ mounter: mounter,
+ localSocket: GetLocalSocket(volumeID),
}
}
func (vol *Volume) Stage(stagingTargetPath string) error {
- if vol.isStaged() {
- return nil
- }
-
// check whether it can be mounted
if notMnt, err := checkMount(stagingTargetPath); err != nil {
return err
} else if !notMnt {
- // maybe already mounted?
- return nil
+ // try to unmount before mounting again
+ _ = mount.New("").Unmount(stagingTargetPath)
}
- if err := vol.mounter.Mount(stagingTargetPath); err != nil {
+ if m, err := vol.mounter.Mount(stagingTargetPath); err == nil {
+ vol.mount = m
+ return nil
+ } else {
return err
}
-
- vol.stagingTargetPath = stagingTargetPath
- return nil
}
-func (vol *Volume) Publish(targetPath string, readOnly bool) error {
+func (vol *Volume) Publish(stagingTargetPath string, targetPath string, readOnly bool) error {
// check whether it can be mounted
if notMnt, err := checkMount(targetPath); err != nil {
return err
@@ -68,7 +63,7 @@ func (vol *Volume) Publish(targetPath string, readOnly bool) error {
}
mounter := mount.New("")
- if err := mounter.Mount(vol.stagingTargetPath, targetPath, "", mountOptions); err != nil {
+ if err := mounter.Mount(stagingTargetPath, targetPath, "", mountOptions); err != nil {
return err
}
@@ -76,11 +71,7 @@ func (vol *Volume) Publish(targetPath string, readOnly bool) error {
}
func (vol *Volume) Expand(sizeByte int64) error {
- if !vol.isStaged() {
- return nil
- }
-
- target := fmt.Sprintf("passthrough:///unix://%s", vol.getLocalSocket())
+ target := fmt.Sprintf("passthrough:///unix://%s", vol.localSocket)
dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
clientConn, err := grpc.Dial(target, dialOption)
@@ -106,36 +97,21 @@ func (vol *Volume) Unpublish(targetPath string) error {
return nil
}
-func (vol *Volume) Unstage(_ string) error {
- if !vol.isStaged() {
+func (vol *Volume) Unstage(stagingTargetPath string) error {
+ glog.V(0).Infof("unmounting volume %s from %s", vol.VolumeId, stagingTargetPath)
+
+ if vol.mount == nil {
+ glog.Errorf("volume is not mounted: %s, path", vol.VolumeId, stagingTargetPath)
return nil
}
- mountPoint := vol.stagingTargetPath
- glog.V(0).Infof("unmounting volume %s from %s", vol.VolumeId, mountPoint)
-
- if err := fuseUnmount(mountPoint); err != nil {
+ if err := vol.mount.Unmount(); err != nil {
return err
}
- if err := os.Remove(mountPoint); err != nil && !os.IsNotExist(err) {
+ if err := os.Remove(stagingTargetPath); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}
-
-func (vol *Volume) isStaged() bool {
- return vol.stagingTargetPath != ""
-}
-
-func (vol *Volume) getLocalSocket() string {
- if vol.localSocket != "" {
- return vol.localSocket
- }
-
- socket := GetLocalSocket(vol.VolumeId)
-
- vol.localSocket = socket
- return socket
-}