aboutsummaryrefslogtreecommitdiff
path: root/pkg/driver
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/driver')
-rw-r--r--pkg/driver/driver.go6
-rw-r--r--pkg/driver/mounter.go150
-rw-r--r--pkg/driver/mounter_seaweedfs.go188
-rw-r--r--pkg/driver/nodeserver.go21
-rw-r--r--pkg/driver/volume.go28
5 files changed, 75 insertions, 318 deletions
diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go
index 4e09a93..d162369 100644
--- a/pkg/driver/driver.go
+++ b/pkg/driver/driver.go
@@ -27,7 +27,8 @@ type SeaweedFsDriver struct {
nodeID string
version string
- endpoint string
+ endpoint string
+ mountEndpoint string
vcap []*csi.VolumeCapability_AccessMode
cscap []*csi.ControllerServiceCapability
@@ -48,7 +49,7 @@ type SeaweedFsDriver struct {
RunController bool
}
-func NewSeaweedFsDriver(name, filer, nodeID, endpoint string, enableAttacher bool) *SeaweedFsDriver {
+func NewSeaweedFsDriver(name, filer, nodeID, endpoint, mountEndpoint string, enableAttacher bool) *SeaweedFsDriver {
glog.Infof("Driver: %v version: %v", name, version)
@@ -56,6 +57,7 @@ func NewSeaweedFsDriver(name, filer, nodeID, endpoint string, enableAttacher boo
n := &SeaweedFsDriver{
endpoint: endpoint,
+ mountEndpoint: mountEndpoint,
nodeID: nodeID,
name: name,
version: version,
diff --git a/pkg/driver/mounter.go b/pkg/driver/mounter.go
index 756f671..6beee55 100644
--- a/pkg/driver/mounter.go
+++ b/pkg/driver/mounter.go
@@ -1,24 +1,12 @@
package driver
import (
- "context"
"fmt"
- "os"
- "syscall"
- "time"
-
- "os/exec"
+ "github.com/seaweedfs/seaweedfs-csi-driver/pkg/mountmanager"
"github.com/seaweedfs/seaweedfs/weed/glog"
- "k8s.io/mount-utils"
)
-// Config holds values to configure the driver
-type Config struct {
- // Region string
- Filer string
-}
-
type Unmounter interface {
Unmount() error
}
@@ -27,107 +15,81 @@ type Mounter interface {
Mount(target string) (Unmounter, error)
}
-type fuseUnmounter struct {
- path string
- cmd *exec.Cmd
+type mountServiceMounter struct {
+ driver *SeaweedFsDriver
+ volumeID string
+ readOnly bool
+ volContext map[string]string
+ client *mountmanager.Client
+}
- finished chan struct{}
+type mountServiceUnmounter struct {
+ client *mountmanager.Client
+ volumeID string
}
func newMounter(volumeID string, readOnly bool, driver *SeaweedFsDriver, volContext map[string]string) (Mounter, error) {
- path, ok := volContext["path"]
- if !ok {
- path = fmt.Sprintf("/buckets/%s", volumeID)
+ client, err := mountmanager.NewClient(driver.mountEndpoint)
+ if err != nil {
+ return nil, err
}
- collection, ok := volContext["collection"]
- if !ok {
- collection = volumeID
+ contextCopy := make(map[string]string, len(volContext))
+ for k, v := range volContext {
+ contextCopy[k] = v
}
- return newSeaweedFsMounter(volumeID, path, collection, readOnly, driver, volContext)
+ return &mountServiceMounter{
+ driver: driver,
+ volumeID: volumeID,
+ readOnly: readOnly,
+ volContext: contextCopy,
+ client: client,
+ }, nil
}
-func fuseMount(path string, command string, args []string) (Unmounter, error) {
- cmd := exec.Command(command, args...)
- glog.V(0).Infof("Mounting fuse with command: %s and args: %s", command, args)
-
- // log fuse process messages - we need an easy way to investigate crashes in case it happens
- cmd.Stderr = os.Stderr
- cmd.Stdout = os.Stdout
-
- err := cmd.Start()
- if err != nil {
- glog.Errorf("running weed mount: %v", err)
- return nil, fmt.Errorf("error fuseMount command: %s\nargs: %s\nerror: %v", command, args, err)
+func (m *mountServiceMounter) Mount(target string) (Unmounter, error) {
+ if target == "" {
+ return nil, fmt.Errorf("target path is required")
}
- fu := &fuseUnmounter{
- path: path,
- cmd: cmd,
-
- finished: make(chan struct{}),
+ filers := make([]string, len(m.driver.filers))
+ for i, address := range m.driver.filers {
+ filers[i] = string(address)
}
- // avoid zombie processes
- go func() {
- if err := cmd.Wait(); err != nil {
- glog.Errorf("weed mount exit, pid: %d, path: %v, error: %v", cmd.Process.Pid, path, err)
- } else {
- glog.Infof("weed mount exit, pid: %d, path: %v", cmd.Process.Pid, path)
- }
-
- // make sure we'll have no stale mounts
- time.Sleep(time.Millisecond * 100)
- _ = mountutil.Unmount(path)
-
- close(fu.finished)
- }()
-
- if err = waitForMount(path, 10*time.Second); err != nil {
- glog.Errorf("weed mount timeout, pid: %d, path: %v", cmd.Process.Pid, path)
+ req := &mountmanager.MountRequest{
+ VolumeID: m.volumeID,
+ TargetPath: target,
+ ReadOnly: m.readOnly,
+ Filers: filers,
+ CacheDir: m.driver.CacheDir,
+ CacheCapacityMB: m.driver.CacheCapacityMB,
+ ConcurrentWriters: m.driver.ConcurrentWriters,
+ UidMap: m.driver.UidMap,
+ GidMap: m.driver.GidMap,
+ DataCenter: m.driver.DataCenter,
+ DataLocality: m.driver.DataLocality.String(),
+ VolumeContext: m.volContext,
+ }
- _ = fu.finish(time.Second * 10)
+ resp, err := m.client.Mount(req)
+ if err != nil {
return nil, err
- } else {
- return fu, nil
}
-}
-func (fu *fuseUnmounter) finish(timeout time.Duration) error {
- // ignore error, just inform we want process to exit
- // SIGHUP is used to reload weed config - we need to use SIGTERM
- _ = fu.cmd.Process.Signal(syscall.SIGTERM)
-
- if err := fu.waitFinished(timeout); err != nil {
- glog.Errorf("weed mount terminate timeout, pid: %d, path: %v", fu.cmd.Process.Pid, fu.path)
- _ = fu.cmd.Process.Kill()
- if err = fu.waitFinished(time.Second * 1); err != nil {
- glog.Errorf("weed mount kill timeout, pid: %d, path: %v", fu.cmd.Process.Pid, fu.path)
- return err
- }
+ expectedSocket := mountmanager.LocalSocketPath(m.volumeID)
+ if resp.LocalSocket != "" && resp.LocalSocket != expectedSocket {
+ glog.Warningf("mount service returned socket %s for volume %s (expected %s)", resp.LocalSocket, m.volumeID, expectedSocket)
}
- return nil
+ return &mountServiceUnmounter{
+ client: m.client,
+ volumeID: m.volumeID,
+ }, nil
}
-func (fu *fuseUnmounter) waitFinished(timeout time.Duration) error {
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- select {
- case <-ctx.Done():
- return context.DeadlineExceeded
- case <-fu.finished:
- return nil
- }
-}
-
-func (fu *fuseUnmounter) Unmount() error {
- if ok, err := mountutil.IsMountPoint(fu.path); ok || mount.IsCorruptedMnt(err) {
- if err := mountutil.Unmount(fu.path); err != nil {
- return err
- }
- }
-
- return fu.finish(time.Second * 5)
+func (u *mountServiceUnmounter) Unmount() error {
+ _, err := u.client.Unmount(&mountmanager.UnmountRequest{VolumeID: u.volumeID})
+ return err
}
diff --git a/pkg/driver/mounter_seaweedfs.go b/pkg/driver/mounter_seaweedfs.go
deleted file mode 100644
index 56ba98b..0000000
--- a/pkg/driver/mounter_seaweedfs.go
+++ /dev/null
@@ -1,188 +0,0 @@
-package driver
-
-import (
- "fmt"
- "os"
- "path/filepath"
- "strings"
-
- "github.com/seaweedfs/seaweedfs-csi-driver/pkg/datalocality"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/util"
-)
-
-// Implements Mounter
-type seaweedFsMounter struct {
- volumeID string
- path string
- collection string
- readOnly bool
- driver *SeaweedFsDriver
- volContext map[string]string
-}
-
-type seaweedFsUnmounter struct {
- unmounter Unmounter
- cacheDir string
-}
-
-const (
- seaweedFsCmd = "weed"
-)
-
-func newSeaweedFsMounter(volumeID string, path string, collection string, readOnly bool, driver *SeaweedFsDriver, volContext map[string]string) (Mounter, error) {
- return &seaweedFsMounter{
- volumeID: volumeID,
- path: path,
- collection: collection,
- readOnly: readOnly,
- driver: driver,
- volContext: volContext,
- }, nil
-}
-
-func (seaweedFs *seaweedFsMounter) Mount(target string) (Unmounter, error) {
- glog.V(0).Infof("mounting %v %s to %s", seaweedFs.driver.filers, seaweedFs.path, target)
-
- var filers []string
- for _, address := range seaweedFs.driver.filers {
- filers = append(filers, string(address))
- }
-
- // CacheDirForRead should be always defined - we use temp dir in case it is not defined
- // we need to use predictable cache path, because we need to clean it up on unstage
- cacheDir := filepath.Join(seaweedFs.driver.CacheDir, seaweedFs.volumeID)
-
- // Final args
- args := []string{
- "-logtostderr=true",
- "mount",
- "-dirAutoCreate=true",
- "-umask=000",
- fmt.Sprintf("-dir=%s", target),
- fmt.Sprintf("-localSocket=%s", GetLocalSocket(seaweedFs.volumeID)),
- fmt.Sprintf("-cacheDir=%s", cacheDir),
- }
-
- if seaweedFs.readOnly {
- args = append(args, "-readOnly")
- }
-
- // Values for override-able args
- // Whitelist for merging with volContext
- argsMap := map[string]string{
- "collection": seaweedFs.collection,
- "filer": strings.Join(filers, ","),
- "filer.path": seaweedFs.path,
- "cacheCapacityMB": fmt.Sprint(seaweedFs.driver.CacheCapacityMB),
- "concurrentWriters": fmt.Sprint(seaweedFs.driver.ConcurrentWriters),
- "map.uid": seaweedFs.driver.UidMap,
- "map.gid": seaweedFs.driver.GidMap,
- "disk": "",
- "dataCenter": "",
- "replication": "",
- "ttl": "",
- "chunkSizeLimitMB": "",
- "volumeServerAccess": "",
- "readRetryTime": "",
- }
-
- // Handle DataLocality
- dataLocality := seaweedFs.driver.DataLocality
- // Try to override when set in context
- if dataLocalityStr, ok := seaweedFs.volContext["dataLocality"]; ok {
- // Convert to enum
- dataLocalityRes, ok := datalocality.FromString(dataLocalityStr)
- if !ok {
- glog.Warning("volumeContext 'dataLocality' invalid")
- } else {
- dataLocality = dataLocalityRes
- }
- }
- if err := CheckDataLocality(&dataLocality, &seaweedFs.driver.DataCenter); err != nil {
- return nil, err
- }
- // Settings based on type
- switch dataLocality {
- case datalocality.Write_preferLocalDc:
- argsMap["dataCenter"] = seaweedFs.driver.DataCenter
- }
-
- // volContext-parameter -> mount-arg
- parameterArgMap := map[string]string{
- "uidMap": "map.uid",
- "gidMap": "map.gid",
- "filerPath": "filer.path",
- // volumeContext has "diskType", but mount-option is "disk", converting for backwards compatability
- "diskType": "disk",
- }
-
- // Explicitly ignored volContext args e.g. handled somewhere else
- ignoreArgs := []string{
- "dataLocality",
- }
-
- // Merge volContext into argsMap with key-mapping
- for arg, value := range seaweedFs.volContext {
- if in_arr(ignoreArgs, arg) {
- continue
- }
-
- // Check if key-mapping exists
- newArg, ok := parameterArgMap[arg]
- if ok {
- arg = newArg
- }
-
- // Check if arg can be applied
- if _, ok := argsMap[arg]; !ok {
- glog.Warningf("VolumeContext '%s' ignored", arg)
- continue
- }
-
- // Write to args-map
- argsMap[arg] = value
- }
-
- // Convert Args-Map to args
- for arg, value := range argsMap {
- if value != "" { // ignore empty values
- args = append(args, fmt.Sprintf("-%s=%s", arg, value))
- }
- }
-
- u, err := fuseMount(target, seaweedFsCmd, args)
- if err != nil {
- glog.Errorf("mount %v %s to %s: %s", seaweedFs.driver.filers, seaweedFs.path, target, err)
- }
-
- return &seaweedFsUnmounter{unmounter: u, cacheDir: cacheDir}, err
-}
-
-func (su *seaweedFsUnmounter) Unmount() error {
- err := su.unmounter.Unmount()
- err2 := os.RemoveAll(su.cacheDir)
- if err2 != nil {
- glog.Warningf("error removing cache from: %s, err: %v", su.cacheDir, err2)
- }
- return err
-}
-
-func GetLocalSocket(volumeID string) string {
- montDirHash := util.HashToInt32([]byte(volumeID))
- if montDirHash < 0 {
- montDirHash = -montDirHash
- }
-
- socket := fmt.Sprintf("/tmp/seaweedfs-mount-%d.sock", montDirHash)
- return socket
-}
-
-func in_arr(arr []string, val string) bool {
- for _, v := range arr {
- if val == v {
- return true
- }
- }
- return false
-}
diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go
index 21e68bf..9371d09 100644
--- a/pkg/driver/nodeserver.go
+++ b/pkg/driver/nodeserver.go
@@ -185,11 +185,12 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
// Note: The returned Volume won't have an unmounter, so Unstage will need special handling.
func (ns *NodeServer) rebuildVolumeFromStaging(volumeID string, stagingPath string) *Volume {
return &Volume{
- VolumeId: volumeID,
- StagedPath: stagingPath,
- localSocket: GetLocalSocket(volumeID),
+ VolumeId: volumeID,
+ StagedPath: stagingPath,
+ driver: ns.Driver,
// mounter and unmounter are nil - this is intentional
// The FUSE process is already running, we just need to track the volume
+ // The mount service will have the mount tracked if it's still alive
}
}
@@ -344,17 +345,7 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
}
func (ns *NodeServer) NodeCleanup() {
- ns.volumes.Range(func(_, vol any) bool {
- v := vol.(*Volume)
- if len(v.StagedPath) > 0 {
- glog.Infof("cleaning up volume %s at %s", v.VolumeId, v.StagedPath)
- err := v.Unstage(v.StagedPath)
- if err != nil {
- glog.Warningf("error cleaning up volume %s at %s, err: %v", v.VolumeId, v.StagedPath, err)
- }
- }
- return true
- })
+ glog.Infof("node cleanup skipped; mount service retains mounts across restarts")
}
func (ns *NodeServer) getVolumeMutex(volumeID string) *sync.Mutex {
@@ -373,7 +364,7 @@ func (ns *NodeServer) stageNewVolume(volumeID, stagingTargetPath string, volCont
return nil, err
}
- volume := NewVolume(volumeID, mounter)
+ volume := NewVolume(volumeID, mounter, ns.Driver)
if err := volume.Stage(stagingTargetPath); err != nil {
return nil, err
}
diff --git a/pkg/driver/volume.go b/pkg/driver/volume.go
index 6cc777b..080595e 100644
--- a/pkg/driver/volume.go
+++ b/pkg/driver/volume.go
@@ -1,14 +1,10 @@
package driver
import (
- "context"
- "fmt"
"os"
+ "github.com/seaweedfs/seaweedfs-csi-driver/pkg/mountmanager"
"github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials/insecure"
"k8s.io/mount-utils"
)
@@ -18,16 +14,14 @@ type Volume struct {
mounter Mounter
unmounter Unmounter
-
- // unix socket used to manage volume
- localSocket string
+ driver *SeaweedFsDriver
}
-func NewVolume(volumeID string, mounter Mounter) *Volume {
+func NewVolume(volumeID string, mounter Mounter, driver *SeaweedFsDriver) *Volume {
return &Volume{
- VolumeId: volumeID,
- mounter: mounter,
- localSocket: GetLocalSocket(volumeID),
+ VolumeId: volumeID,
+ mounter: mounter,
+ driver: driver,
}
}
@@ -81,22 +75,18 @@ func (vol *Volume) Publish(stagingTargetPath string, targetPath string, readOnly
}
func (vol *Volume) Quota(sizeByte int64) error {
- target := fmt.Sprintf("passthrough:///unix://%s", vol.localSocket)
- dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
-
- clientConn, err := grpc.Dial(target, dialOption)
+ client, err := mountmanager.NewClient(vol.driver.mountEndpoint)
if err != nil {
return err
}
- defer clientConn.Close()
// We can't create PV of zero size, so we're using quota of 1 byte to define no quota.
if sizeByte == 1 {
sizeByte = 0
}
- client := mount_pb.NewSeaweedMountClient(clientConn)
- _, err = client.Configure(context.Background(), &mount_pb.ConfigureRequest{
+ _, err = client.Configure(&mountmanager.ConfigureRequest{
+ VolumeID: vol.VolumeId,
CollectionCapacity: sizeByte,
})
return err