diff options
| author | 泽淼 周 <zhouzemiao@ihuman.com> | 2025-09-26 19:54:44 +0800 |
|---|---|---|
| committer | chrislusf <chris.lu@gmail.com> | 2025-12-05 17:51:28 -0800 |
| commit | bedb15e3e3742a4bd4e26e71e6d9818e9e1474bd (patch) | |
| tree | 7a60daa0bc9cf86204ddeddc94bc7dc748483cb5 /pkg/driver/mounter.go | |
| parent | fd2b35494095ccf7b06fb210305406f83ed17998 (diff) | |
| download | seaweedfs-csi-driver-bedb15e3e3742a4bd4e26e71e6d9818e9e1474bd.tar.xz seaweedfs-csi-driver-bedb15e3e3742a4bd4e26e71e6d9818e9e1474bd.zip | |
feat: Separated weed mount lifecycle into a dedicated service and rewired the CSI components to call it.
Diffstat (limited to 'pkg/driver/mounter.go')
| -rw-r--r-- | pkg/driver/mounter.go | 150 |
1 files changed, 56 insertions, 94 deletions
diff --git a/pkg/driver/mounter.go b/pkg/driver/mounter.go index 756f671..6beee55 100644 --- a/pkg/driver/mounter.go +++ b/pkg/driver/mounter.go @@ -1,24 +1,12 @@ package driver import ( - "context" "fmt" - "os" - "syscall" - "time" - - "os/exec" + "github.com/seaweedfs/seaweedfs-csi-driver/pkg/mountmanager" "github.com/seaweedfs/seaweedfs/weed/glog" - "k8s.io/mount-utils" ) -// Config holds values to configure the driver -type Config struct { - // Region string - Filer string -} - type Unmounter interface { Unmount() error } @@ -27,107 +15,81 @@ type Mounter interface { Mount(target string) (Unmounter, error) } -type fuseUnmounter struct { - path string - cmd *exec.Cmd +type mountServiceMounter struct { + driver *SeaweedFsDriver + volumeID string + readOnly bool + volContext map[string]string + client *mountmanager.Client +} - finished chan struct{} +type mountServiceUnmounter struct { + client *mountmanager.Client + volumeID string } func newMounter(volumeID string, readOnly bool, driver *SeaweedFsDriver, volContext map[string]string) (Mounter, error) { - path, ok := volContext["path"] - if !ok { - path = fmt.Sprintf("/buckets/%s", volumeID) + client, err := mountmanager.NewClient(driver.mountEndpoint) + if err != nil { + return nil, err } - collection, ok := volContext["collection"] - if !ok { - collection = volumeID + contextCopy := make(map[string]string, len(volContext)) + for k, v := range volContext { + contextCopy[k] = v } - return newSeaweedFsMounter(volumeID, path, collection, readOnly, driver, volContext) + return &mountServiceMounter{ + driver: driver, + volumeID: volumeID, + readOnly: readOnly, + volContext: contextCopy, + client: client, + }, nil } -func fuseMount(path string, command string, args []string) (Unmounter, error) { - cmd := exec.Command(command, args...) - glog.V(0).Infof("Mounting fuse with command: %s and args: %s", command, args) - - // log fuse process messages - we need an easy way to investigate crashes in case it happens - cmd.Stderr = os.Stderr - cmd.Stdout = os.Stdout - - err := cmd.Start() - if err != nil { - glog.Errorf("running weed mount: %v", err) - return nil, fmt.Errorf("error fuseMount command: %s\nargs: %s\nerror: %v", command, args, err) +func (m *mountServiceMounter) Mount(target string) (Unmounter, error) { + if target == "" { + return nil, fmt.Errorf("target path is required") } - fu := &fuseUnmounter{ - path: path, - cmd: cmd, - - finished: make(chan struct{}), + filers := make([]string, len(m.driver.filers)) + for i, address := range m.driver.filers { + filers[i] = string(address) } - // avoid zombie processes - go func() { - if err := cmd.Wait(); err != nil { - glog.Errorf("weed mount exit, pid: %d, path: %v, error: %v", cmd.Process.Pid, path, err) - } else { - glog.Infof("weed mount exit, pid: %d, path: %v", cmd.Process.Pid, path) - } - - // make sure we'll have no stale mounts - time.Sleep(time.Millisecond * 100) - _ = mountutil.Unmount(path) - - close(fu.finished) - }() - - if err = waitForMount(path, 10*time.Second); err != nil { - glog.Errorf("weed mount timeout, pid: %d, path: %v", cmd.Process.Pid, path) + req := &mountmanager.MountRequest{ + VolumeID: m.volumeID, + TargetPath: target, + ReadOnly: m.readOnly, + Filers: filers, + CacheDir: m.driver.CacheDir, + CacheCapacityMB: m.driver.CacheCapacityMB, + ConcurrentWriters: m.driver.ConcurrentWriters, + UidMap: m.driver.UidMap, + GidMap: m.driver.GidMap, + DataCenter: m.driver.DataCenter, + DataLocality: m.driver.DataLocality.String(), + VolumeContext: m.volContext, + } - _ = fu.finish(time.Second * 10) + resp, err := m.client.Mount(req) + if err != nil { return nil, err - } else { - return fu, nil } -} -func (fu *fuseUnmounter) finish(timeout time.Duration) error { - // ignore error, just inform we want process to exit - // SIGHUP is used to reload weed config - we need to use SIGTERM - _ = fu.cmd.Process.Signal(syscall.SIGTERM) - - if err := fu.waitFinished(timeout); err != nil { - glog.Errorf("weed mount terminate timeout, pid: %d, path: %v", fu.cmd.Process.Pid, fu.path) - _ = fu.cmd.Process.Kill() - if err = fu.waitFinished(time.Second * 1); err != nil { - glog.Errorf("weed mount kill timeout, pid: %d, path: %v", fu.cmd.Process.Pid, fu.path) - return err - } + expectedSocket := mountmanager.LocalSocketPath(m.volumeID) + if resp.LocalSocket != "" && resp.LocalSocket != expectedSocket { + glog.Warningf("mount service returned socket %s for volume %s (expected %s)", resp.LocalSocket, m.volumeID, expectedSocket) } - return nil + return &mountServiceUnmounter{ + client: m.client, + volumeID: m.volumeID, + }, nil } -func (fu *fuseUnmounter) waitFinished(timeout time.Duration) error { - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - select { - case <-ctx.Done(): - return context.DeadlineExceeded - case <-fu.finished: - return nil - } -} - -func (fu *fuseUnmounter) Unmount() error { - if ok, err := mountutil.IsMountPoint(fu.path); ok || mount.IsCorruptedMnt(err) { - if err := mountutil.Unmount(fu.path); err != nil { - return err - } - } - - return fu.finish(time.Second * 5) +func (u *mountServiceUnmounter) Unmount() error { + _, err := u.client.Unmount(&mountmanager.UnmountRequest{VolumeID: u.volumeID}) + return err } |
