aboutsummaryrefslogtreecommitdiff
path: root/pkg/driver/mounter.go
diff options
context:
space:
mode:
author泽淼 周 <zhouzemiao@ihuman.com>2025-09-26 19:54:44 +0800
committerChris Lu <chrislusf@users.noreply.github.com>2025-12-06 18:53:22 -0800
commit2828d5a05c36aa8719778142eb4472007906f14c (patch)
tree7a60daa0bc9cf86204ddeddc94bc7dc748483cb5 /pkg/driver/mounter.go
parentfd2b35494095ccf7b06fb210305406f83ed17998 (diff)
downloadseaweedfs-csi-driver-2828d5a05c36aa8719778142eb4472007906f14c.tar.xz
seaweedfs-csi-driver-2828d5a05c36aa8719778142eb4472007906f14c.zip
feat: Separated weed mount lifecycle into a dedicated service and rewired the CSI components to call it.
Diffstat (limited to 'pkg/driver/mounter.go')
-rw-r--r--pkg/driver/mounter.go150
1 files changed, 56 insertions, 94 deletions
diff --git a/pkg/driver/mounter.go b/pkg/driver/mounter.go
index 756f671..6beee55 100644
--- a/pkg/driver/mounter.go
+++ b/pkg/driver/mounter.go
@@ -1,24 +1,12 @@
package driver
import (
- "context"
"fmt"
- "os"
- "syscall"
- "time"
-
- "os/exec"
+ "github.com/seaweedfs/seaweedfs-csi-driver/pkg/mountmanager"
"github.com/seaweedfs/seaweedfs/weed/glog"
- "k8s.io/mount-utils"
)
-// Config holds values to configure the driver
-type Config struct {
- // Region string
- Filer string
-}
-
type Unmounter interface {
Unmount() error
}
@@ -27,107 +15,81 @@ type Mounter interface {
Mount(target string) (Unmounter, error)
}
-type fuseUnmounter struct {
- path string
- cmd *exec.Cmd
+type mountServiceMounter struct {
+ driver *SeaweedFsDriver
+ volumeID string
+ readOnly bool
+ volContext map[string]string
+ client *mountmanager.Client
+}
- finished chan struct{}
+type mountServiceUnmounter struct {
+ client *mountmanager.Client
+ volumeID string
}
func newMounter(volumeID string, readOnly bool, driver *SeaweedFsDriver, volContext map[string]string) (Mounter, error) {
- path, ok := volContext["path"]
- if !ok {
- path = fmt.Sprintf("/buckets/%s", volumeID)
+ client, err := mountmanager.NewClient(driver.mountEndpoint)
+ if err != nil {
+ return nil, err
}
- collection, ok := volContext["collection"]
- if !ok {
- collection = volumeID
+ contextCopy := make(map[string]string, len(volContext))
+ for k, v := range volContext {
+ contextCopy[k] = v
}
- return newSeaweedFsMounter(volumeID, path, collection, readOnly, driver, volContext)
+ return &mountServiceMounter{
+ driver: driver,
+ volumeID: volumeID,
+ readOnly: readOnly,
+ volContext: contextCopy,
+ client: client,
+ }, nil
}
-func fuseMount(path string, command string, args []string) (Unmounter, error) {
- cmd := exec.Command(command, args...)
- glog.V(0).Infof("Mounting fuse with command: %s and args: %s", command, args)
-
- // log fuse process messages - we need an easy way to investigate crashes in case it happens
- cmd.Stderr = os.Stderr
- cmd.Stdout = os.Stdout
-
- err := cmd.Start()
- if err != nil {
- glog.Errorf("running weed mount: %v", err)
- return nil, fmt.Errorf("error fuseMount command: %s\nargs: %s\nerror: %v", command, args, err)
+func (m *mountServiceMounter) Mount(target string) (Unmounter, error) {
+ if target == "" {
+ return nil, fmt.Errorf("target path is required")
}
- fu := &fuseUnmounter{
- path: path,
- cmd: cmd,
-
- finished: make(chan struct{}),
+ filers := make([]string, len(m.driver.filers))
+ for i, address := range m.driver.filers {
+ filers[i] = string(address)
}
- // avoid zombie processes
- go func() {
- if err := cmd.Wait(); err != nil {
- glog.Errorf("weed mount exit, pid: %d, path: %v, error: %v", cmd.Process.Pid, path, err)
- } else {
- glog.Infof("weed mount exit, pid: %d, path: %v", cmd.Process.Pid, path)
- }
-
- // make sure we'll have no stale mounts
- time.Sleep(time.Millisecond * 100)
- _ = mountutil.Unmount(path)
-
- close(fu.finished)
- }()
-
- if err = waitForMount(path, 10*time.Second); err != nil {
- glog.Errorf("weed mount timeout, pid: %d, path: %v", cmd.Process.Pid, path)
+ req := &mountmanager.MountRequest{
+ VolumeID: m.volumeID,
+ TargetPath: target,
+ ReadOnly: m.readOnly,
+ Filers: filers,
+ CacheDir: m.driver.CacheDir,
+ CacheCapacityMB: m.driver.CacheCapacityMB,
+ ConcurrentWriters: m.driver.ConcurrentWriters,
+ UidMap: m.driver.UidMap,
+ GidMap: m.driver.GidMap,
+ DataCenter: m.driver.DataCenter,
+ DataLocality: m.driver.DataLocality.String(),
+ VolumeContext: m.volContext,
+ }
- _ = fu.finish(time.Second * 10)
+ resp, err := m.client.Mount(req)
+ if err != nil {
return nil, err
- } else {
- return fu, nil
}
-}
-func (fu *fuseUnmounter) finish(timeout time.Duration) error {
- // ignore error, just inform we want process to exit
- // SIGHUP is used to reload weed config - we need to use SIGTERM
- _ = fu.cmd.Process.Signal(syscall.SIGTERM)
-
- if err := fu.waitFinished(timeout); err != nil {
- glog.Errorf("weed mount terminate timeout, pid: %d, path: %v", fu.cmd.Process.Pid, fu.path)
- _ = fu.cmd.Process.Kill()
- if err = fu.waitFinished(time.Second * 1); err != nil {
- glog.Errorf("weed mount kill timeout, pid: %d, path: %v", fu.cmd.Process.Pid, fu.path)
- return err
- }
+ expectedSocket := mountmanager.LocalSocketPath(m.volumeID)
+ if resp.LocalSocket != "" && resp.LocalSocket != expectedSocket {
+ glog.Warningf("mount service returned socket %s for volume %s (expected %s)", resp.LocalSocket, m.volumeID, expectedSocket)
}
- return nil
+ return &mountServiceUnmounter{
+ client: m.client,
+ volumeID: m.volumeID,
+ }, nil
}
-func (fu *fuseUnmounter) waitFinished(timeout time.Duration) error {
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- select {
- case <-ctx.Done():
- return context.DeadlineExceeded
- case <-fu.finished:
- return nil
- }
-}
-
-func (fu *fuseUnmounter) Unmount() error {
- if ok, err := mountutil.IsMountPoint(fu.path); ok || mount.IsCorruptedMnt(err) {
- if err := mountutil.Unmount(fu.path); err != nil {
- return err
- }
- }
-
- return fu.finish(time.Second * 5)
+func (u *mountServiceUnmounter) Unmount() error {
+ _, err := u.client.Unmount(&mountmanager.UnmountRequest{VolumeID: u.volumeID})
+ return err
}