diff options
| author | 泽淼 周 <zhouzemiao@ihuman.com> | 2025-09-26 19:54:44 +0800 |
|---|---|---|
| committer | Chris Lu <chrislusf@users.noreply.github.com> | 2025-12-06 18:53:22 -0800 |
| commit | 2828d5a05c36aa8719778142eb4472007906f14c (patch) | |
| tree | 7a60daa0bc9cf86204ddeddc94bc7dc748483cb5 /pkg/driver | |
| parent | fd2b35494095ccf7b06fb210305406f83ed17998 (diff) | |
| download | seaweedfs-csi-driver-2828d5a05c36aa8719778142eb4472007906f14c.tar.xz seaweedfs-csi-driver-2828d5a05c36aa8719778142eb4472007906f14c.zip | |
feat: Separated weed mount lifecycle into a dedicated service and rewired the CSI components to call it.
Diffstat (limited to 'pkg/driver')
| -rw-r--r-- | pkg/driver/driver.go | 6 | ||||
| -rw-r--r-- | pkg/driver/mounter.go | 150 | ||||
| -rw-r--r-- | pkg/driver/mounter_seaweedfs.go | 188 | ||||
| -rw-r--r-- | pkg/driver/nodeserver.go | 21 | ||||
| -rw-r--r-- | pkg/driver/volume.go | 28 |
5 files changed, 75 insertions, 318 deletions
diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index 4e09a93..d162369 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -27,7 +27,8 @@ type SeaweedFsDriver struct { nodeID string version string - endpoint string + endpoint string + mountEndpoint string vcap []*csi.VolumeCapability_AccessMode cscap []*csi.ControllerServiceCapability @@ -48,7 +49,7 @@ type SeaweedFsDriver struct { RunController bool } -func NewSeaweedFsDriver(name, filer, nodeID, endpoint string, enableAttacher bool) *SeaweedFsDriver { +func NewSeaweedFsDriver(name, filer, nodeID, endpoint, mountEndpoint string, enableAttacher bool) *SeaweedFsDriver { glog.Infof("Driver: %v version: %v", name, version) @@ -56,6 +57,7 @@ func NewSeaweedFsDriver(name, filer, nodeID, endpoint string, enableAttacher boo n := &SeaweedFsDriver{ endpoint: endpoint, + mountEndpoint: mountEndpoint, nodeID: nodeID, name: name, version: version, diff --git a/pkg/driver/mounter.go b/pkg/driver/mounter.go index 756f671..6beee55 100644 --- a/pkg/driver/mounter.go +++ b/pkg/driver/mounter.go @@ -1,24 +1,12 @@ package driver import ( - "context" "fmt" - "os" - "syscall" - "time" - - "os/exec" + "github.com/seaweedfs/seaweedfs-csi-driver/pkg/mountmanager" "github.com/seaweedfs/seaweedfs/weed/glog" - "k8s.io/mount-utils" ) -// Config holds values to configure the driver -type Config struct { - // Region string - Filer string -} - type Unmounter interface { Unmount() error } @@ -27,107 +15,81 @@ type Mounter interface { Mount(target string) (Unmounter, error) } -type fuseUnmounter struct { - path string - cmd *exec.Cmd +type mountServiceMounter struct { + driver *SeaweedFsDriver + volumeID string + readOnly bool + volContext map[string]string + client *mountmanager.Client +} - finished chan struct{} +type mountServiceUnmounter struct { + client *mountmanager.Client + volumeID string } func newMounter(volumeID string, readOnly bool, driver *SeaweedFsDriver, volContext map[string]string) (Mounter, error) { - path, ok := volContext["path"] - if !ok { - path = fmt.Sprintf("/buckets/%s", volumeID) + client, err := mountmanager.NewClient(driver.mountEndpoint) + if err != nil { + return nil, err } - collection, ok := volContext["collection"] - if !ok { - collection = volumeID + contextCopy := make(map[string]string, len(volContext)) + for k, v := range volContext { + contextCopy[k] = v } - return newSeaweedFsMounter(volumeID, path, collection, readOnly, driver, volContext) + return &mountServiceMounter{ + driver: driver, + volumeID: volumeID, + readOnly: readOnly, + volContext: contextCopy, + client: client, + }, nil } -func fuseMount(path string, command string, args []string) (Unmounter, error) { - cmd := exec.Command(command, args...) - glog.V(0).Infof("Mounting fuse with command: %s and args: %s", command, args) - - // log fuse process messages - we need an easy way to investigate crashes in case it happens - cmd.Stderr = os.Stderr - cmd.Stdout = os.Stdout - - err := cmd.Start() - if err != nil { - glog.Errorf("running weed mount: %v", err) - return nil, fmt.Errorf("error fuseMount command: %s\nargs: %s\nerror: %v", command, args, err) +func (m *mountServiceMounter) Mount(target string) (Unmounter, error) { + if target == "" { + return nil, fmt.Errorf("target path is required") } - fu := &fuseUnmounter{ - path: path, - cmd: cmd, - - finished: make(chan struct{}), + filers := make([]string, len(m.driver.filers)) + for i, address := range m.driver.filers { + filers[i] = string(address) } - // avoid zombie processes - go func() { - if err := cmd.Wait(); err != nil { - glog.Errorf("weed mount exit, pid: %d, path: %v, error: %v", cmd.Process.Pid, path, err) - } else { - glog.Infof("weed mount exit, pid: %d, path: %v", cmd.Process.Pid, path) - } - - // make sure we'll have no stale mounts - time.Sleep(time.Millisecond * 100) - _ = mountutil.Unmount(path) - - close(fu.finished) - }() - - if err = waitForMount(path, 10*time.Second); err != nil { - glog.Errorf("weed mount timeout, pid: %d, path: %v", cmd.Process.Pid, path) + req := &mountmanager.MountRequest{ + VolumeID: m.volumeID, + TargetPath: target, + ReadOnly: m.readOnly, + Filers: filers, + CacheDir: m.driver.CacheDir, + CacheCapacityMB: m.driver.CacheCapacityMB, + ConcurrentWriters: m.driver.ConcurrentWriters, + UidMap: m.driver.UidMap, + GidMap: m.driver.GidMap, + DataCenter: m.driver.DataCenter, + DataLocality: m.driver.DataLocality.String(), + VolumeContext: m.volContext, + } - _ = fu.finish(time.Second * 10) + resp, err := m.client.Mount(req) + if err != nil { return nil, err - } else { - return fu, nil } -} -func (fu *fuseUnmounter) finish(timeout time.Duration) error { - // ignore error, just inform we want process to exit - // SIGHUP is used to reload weed config - we need to use SIGTERM - _ = fu.cmd.Process.Signal(syscall.SIGTERM) - - if err := fu.waitFinished(timeout); err != nil { - glog.Errorf("weed mount terminate timeout, pid: %d, path: %v", fu.cmd.Process.Pid, fu.path) - _ = fu.cmd.Process.Kill() - if err = fu.waitFinished(time.Second * 1); err != nil { - glog.Errorf("weed mount kill timeout, pid: %d, path: %v", fu.cmd.Process.Pid, fu.path) - return err - } + expectedSocket := mountmanager.LocalSocketPath(m.volumeID) + if resp.LocalSocket != "" && resp.LocalSocket != expectedSocket { + glog.Warningf("mount service returned socket %s for volume %s (expected %s)", resp.LocalSocket, m.volumeID, expectedSocket) } - return nil + return &mountServiceUnmounter{ + client: m.client, + volumeID: m.volumeID, + }, nil } -func (fu *fuseUnmounter) waitFinished(timeout time.Duration) error { - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - select { - case <-ctx.Done(): - return context.DeadlineExceeded - case <-fu.finished: - return nil - } -} - -func (fu *fuseUnmounter) Unmount() error { - if ok, err := mountutil.IsMountPoint(fu.path); ok || mount.IsCorruptedMnt(err) { - if err := mountutil.Unmount(fu.path); err != nil { - return err - } - } - - return fu.finish(time.Second * 5) +func (u *mountServiceUnmounter) Unmount() error { + _, err := u.client.Unmount(&mountmanager.UnmountRequest{VolumeID: u.volumeID}) + return err } diff --git a/pkg/driver/mounter_seaweedfs.go b/pkg/driver/mounter_seaweedfs.go deleted file mode 100644 index 56ba98b..0000000 --- a/pkg/driver/mounter_seaweedfs.go +++ /dev/null @@ -1,188 +0,0 @@ -package driver - -import ( - "fmt" - "os" - "path/filepath" - "strings" - - "github.com/seaweedfs/seaweedfs-csi-driver/pkg/datalocality" - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/util" -) - -// Implements Mounter -type seaweedFsMounter struct { - volumeID string - path string - collection string - readOnly bool - driver *SeaweedFsDriver - volContext map[string]string -} - -type seaweedFsUnmounter struct { - unmounter Unmounter - cacheDir string -} - -const ( - seaweedFsCmd = "weed" -) - -func newSeaweedFsMounter(volumeID string, path string, collection string, readOnly bool, driver *SeaweedFsDriver, volContext map[string]string) (Mounter, error) { - return &seaweedFsMounter{ - volumeID: volumeID, - path: path, - collection: collection, - readOnly: readOnly, - driver: driver, - volContext: volContext, - }, nil -} - -func (seaweedFs *seaweedFsMounter) Mount(target string) (Unmounter, error) { - glog.V(0).Infof("mounting %v %s to %s", seaweedFs.driver.filers, seaweedFs.path, target) - - var filers []string - for _, address := range seaweedFs.driver.filers { - filers = append(filers, string(address)) - } - - // CacheDirForRead should be always defined - we use temp dir in case it is not defined - // we need to use predictable cache path, because we need to clean it up on unstage - cacheDir := filepath.Join(seaweedFs.driver.CacheDir, seaweedFs.volumeID) - - // Final args - args := []string{ - "-logtostderr=true", - "mount", - "-dirAutoCreate=true", - "-umask=000", - fmt.Sprintf("-dir=%s", target), - fmt.Sprintf("-localSocket=%s", GetLocalSocket(seaweedFs.volumeID)), - fmt.Sprintf("-cacheDir=%s", cacheDir), - } - - if seaweedFs.readOnly { - args = append(args, "-readOnly") - } - - // Values for override-able args - // Whitelist for merging with volContext - argsMap := map[string]string{ - "collection": seaweedFs.collection, - "filer": strings.Join(filers, ","), - "filer.path": seaweedFs.path, - "cacheCapacityMB": fmt.Sprint(seaweedFs.driver.CacheCapacityMB), - "concurrentWriters": fmt.Sprint(seaweedFs.driver.ConcurrentWriters), - "map.uid": seaweedFs.driver.UidMap, - "map.gid": seaweedFs.driver.GidMap, - "disk": "", - "dataCenter": "", - "replication": "", - "ttl": "", - "chunkSizeLimitMB": "", - "volumeServerAccess": "", - "readRetryTime": "", - } - - // Handle DataLocality - dataLocality := seaweedFs.driver.DataLocality - // Try to override when set in context - if dataLocalityStr, ok := seaweedFs.volContext["dataLocality"]; ok { - // Convert to enum - dataLocalityRes, ok := datalocality.FromString(dataLocalityStr) - if !ok { - glog.Warning("volumeContext 'dataLocality' invalid") - } else { - dataLocality = dataLocalityRes - } - } - if err := CheckDataLocality(&dataLocality, &seaweedFs.driver.DataCenter); err != nil { - return nil, err - } - // Settings based on type - switch dataLocality { - case datalocality.Write_preferLocalDc: - argsMap["dataCenter"] = seaweedFs.driver.DataCenter - } - - // volContext-parameter -> mount-arg - parameterArgMap := map[string]string{ - "uidMap": "map.uid", - "gidMap": "map.gid", - "filerPath": "filer.path", - // volumeContext has "diskType", but mount-option is "disk", converting for backwards compatability - "diskType": "disk", - } - - // Explicitly ignored volContext args e.g. handled somewhere else - ignoreArgs := []string{ - "dataLocality", - } - - // Merge volContext into argsMap with key-mapping - for arg, value := range seaweedFs.volContext { - if in_arr(ignoreArgs, arg) { - continue - } - - // Check if key-mapping exists - newArg, ok := parameterArgMap[arg] - if ok { - arg = newArg - } - - // Check if arg can be applied - if _, ok := argsMap[arg]; !ok { - glog.Warningf("VolumeContext '%s' ignored", arg) - continue - } - - // Write to args-map - argsMap[arg] = value - } - - // Convert Args-Map to args - for arg, value := range argsMap { - if value != "" { // ignore empty values - args = append(args, fmt.Sprintf("-%s=%s", arg, value)) - } - } - - u, err := fuseMount(target, seaweedFsCmd, args) - if err != nil { - glog.Errorf("mount %v %s to %s: %s", seaweedFs.driver.filers, seaweedFs.path, target, err) - } - - return &seaweedFsUnmounter{unmounter: u, cacheDir: cacheDir}, err -} - -func (su *seaweedFsUnmounter) Unmount() error { - err := su.unmounter.Unmount() - err2 := os.RemoveAll(su.cacheDir) - if err2 != nil { - glog.Warningf("error removing cache from: %s, err: %v", su.cacheDir, err2) - } - return err -} - -func GetLocalSocket(volumeID string) string { - montDirHash := util.HashToInt32([]byte(volumeID)) - if montDirHash < 0 { - montDirHash = -montDirHash - } - - socket := fmt.Sprintf("/tmp/seaweedfs-mount-%d.sock", montDirHash) - return socket -} - -func in_arr(arr []string, val string) bool { - for _, v := range arr { - if val == v { - return true - } - } - return false -} diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go index 21e68bf..9371d09 100644 --- a/pkg/driver/nodeserver.go +++ b/pkg/driver/nodeserver.go @@ -185,11 +185,12 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis // Note: The returned Volume won't have an unmounter, so Unstage will need special handling. func (ns *NodeServer) rebuildVolumeFromStaging(volumeID string, stagingPath string) *Volume { return &Volume{ - VolumeId: volumeID, - StagedPath: stagingPath, - localSocket: GetLocalSocket(volumeID), + VolumeId: volumeID, + StagedPath: stagingPath, + driver: ns.Driver, // mounter and unmounter are nil - this is intentional // The FUSE process is already running, we just need to track the volume + // The mount service will have the mount tracked if it's still alive } } @@ -344,17 +345,7 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV } func (ns *NodeServer) NodeCleanup() { - ns.volumes.Range(func(_, vol any) bool { - v := vol.(*Volume) - if len(v.StagedPath) > 0 { - glog.Infof("cleaning up volume %s at %s", v.VolumeId, v.StagedPath) - err := v.Unstage(v.StagedPath) - if err != nil { - glog.Warningf("error cleaning up volume %s at %s, err: %v", v.VolumeId, v.StagedPath, err) - } - } - return true - }) + glog.Infof("node cleanup skipped; mount service retains mounts across restarts") } func (ns *NodeServer) getVolumeMutex(volumeID string) *sync.Mutex { @@ -373,7 +364,7 @@ func (ns *NodeServer) stageNewVolume(volumeID, stagingTargetPath string, volCont return nil, err } - volume := NewVolume(volumeID, mounter) + volume := NewVolume(volumeID, mounter, ns.Driver) if err := volume.Stage(stagingTargetPath); err != nil { return nil, err } diff --git a/pkg/driver/volume.go b/pkg/driver/volume.go index 6cc777b..080595e 100644 --- a/pkg/driver/volume.go +++ b/pkg/driver/volume.go @@ -1,14 +1,10 @@ package driver import ( - "context" - "fmt" "os" + "github.com/seaweedfs/seaweedfs-csi-driver/pkg/mountmanager" "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" "k8s.io/mount-utils" ) @@ -18,16 +14,14 @@ type Volume struct { mounter Mounter unmounter Unmounter - - // unix socket used to manage volume - localSocket string + driver *SeaweedFsDriver } -func NewVolume(volumeID string, mounter Mounter) *Volume { +func NewVolume(volumeID string, mounter Mounter, driver *SeaweedFsDriver) *Volume { return &Volume{ - VolumeId: volumeID, - mounter: mounter, - localSocket: GetLocalSocket(volumeID), + VolumeId: volumeID, + mounter: mounter, + driver: driver, } } @@ -81,22 +75,18 @@ func (vol *Volume) Publish(stagingTargetPath string, targetPath string, readOnly } func (vol *Volume) Quota(sizeByte int64) error { - target := fmt.Sprintf("passthrough:///unix://%s", vol.localSocket) - dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) - - clientConn, err := grpc.Dial(target, dialOption) + client, err := mountmanager.NewClient(vol.driver.mountEndpoint) if err != nil { return err } - defer clientConn.Close() // We can't create PV of zero size, so we're using quota of 1 byte to define no quota. if sizeByte == 1 { sizeByte = 0 } - client := mount_pb.NewSeaweedMountClient(clientConn) - _, err = client.Configure(context.Background(), &mount_pb.ConfigureRequest{ + _, err = client.Configure(&mountmanager.ConfigureRequest{ + VolumeID: vol.VolumeId, CollectionCapacity: sizeByte, }) return err |
