aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/seaweedfs-csi-driver/main.go3
-rw-r--r--cmd/seaweedfs-mount/Dockerfile20
-rw-r--r--cmd/seaweedfs-mount/main.go153
-rw-r--r--pkg/driver/driver.go6
-rw-r--r--pkg/driver/mounter.go150
-rw-r--r--pkg/driver/mounter_seaweedfs.go188
-rw-r--r--pkg/driver/nodeserver.go21
-rw-r--r--pkg/driver/volume.go28
-rw-r--r--pkg/mountmanager/client.go110
-rw-r--r--pkg/mountmanager/endpoint.go15
-rw-r--r--pkg/mountmanager/keymutex.go21
-rw-r--r--pkg/mountmanager/manager.go490
-rw-r--r--pkg/mountmanager/socket.go16
-rw-r--r--pkg/mountmanager/types.go49
14 files changed, 951 insertions, 319 deletions
diff --git a/cmd/seaweedfs-csi-driver/main.go b/cmd/seaweedfs-csi-driver/main.go
index f749db6..45bb810 100644
--- a/cmd/seaweedfs-csi-driver/main.go
+++ b/cmd/seaweedfs-csi-driver/main.go
@@ -19,6 +19,7 @@ var (
filer = flag.String("filer", "localhost:8888", "filer server")
endpoint = flag.String("endpoint", "unix://tmp/seaweedfs-csi.sock", "CSI endpoint to accept gRPC calls")
+ mountEndpoint = flag.String("mountEndpoint", "unix:///tmp/seaweedfs-mount.sock", "mount service endpoint")
nodeID = flag.String("nodeid", "", "node id")
version = flag.Bool("version", false, "Print the version and exit.")
concurrentWriters = flag.Int("concurrentWriters", 32, "limit concurrent goroutine writers if not 0")
@@ -78,7 +79,7 @@ func main() {
glog.Infof("connect to filer %s", *filer)
- drv := driver.NewSeaweedFsDriver(*driverName, *filer, *nodeID, *endpoint, *enableAttacher)
+ drv := driver.NewSeaweedFsDriver(*driverName, *filer, *nodeID, *endpoint, *mountEndpoint, *enableAttacher)
drv.RunNode = runNode
drv.RunController = runController
diff --git a/cmd/seaweedfs-mount/Dockerfile b/cmd/seaweedfs-mount/Dockerfile
new file mode 100644
index 0000000..689037b
--- /dev/null
+++ b/cmd/seaweedfs-mount/Dockerfile
@@ -0,0 +1,20 @@
+FROM golang:1.23-alpine AS builder
+
+RUN apk add git g++
+
+RUN mkdir -p /go/src/github.com/seaweedfs/
+RUN git clone https://github.com/seaweedfs/seaweedfs /go/src/github.com/seaweedfs/seaweedfs
+RUN cd /go/src/github.com/seaweedfs/seaweedfs/weed && go install
+
+RUN mkdir -p /go/src/github.com/zemul/
+RUN git clone https://github.com/zemul/seaweedfs-csi-driver /go/src/github.com/zemul/seaweedfs-csi-driver
+RUN cd /go/src/github.com/zemul/seaweedfs-csi-driver && \
+ go build -ldflags="-s -w" -o /seaweedfs-mount ./cmd/seaweedfs-mount/main.go
+
+FROM alpine AS final
+RUN apk add fuse
+COPY --from=builder /go/bin/weed /usr/bin/
+COPY --from=builder /seaweedfs-mount /
+
+RUN chmod +x /seaweedfs-mount
+ENTRYPOINT ["/seaweedfs-mount"] \ No newline at end of file
diff --git a/cmd/seaweedfs-mount/main.go b/cmd/seaweedfs-mount/main.go
new file mode 100644
index 0000000..174b530
--- /dev/null
+++ b/cmd/seaweedfs-mount/main.go
@@ -0,0 +1,153 @@
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "flag"
+ "net"
+ "net/http"
+ "os"
+ "os/signal"
+ "syscall"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs-csi-driver/pkg/mountmanager"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+)
+
+var (
+ endpoint = flag.String("endpoint", "unix:///tmp/seaweedfs-mount.sock", "endpoint the mount service listens on")
+ weedBinary = flag.String("weedBinary", mountmanager.DefaultWeedBinary, "path to the weed binary")
+)
+
+func main() {
+ flag.Parse()
+
+ scheme, address, err := mountmanager.ParseEndpoint(*endpoint)
+ if err != nil {
+ glog.Fatalf("invalid endpoint: %v", err)
+ }
+ if scheme != "unix" {
+ glog.Fatalf("unsupported endpoint scheme: %s", scheme)
+ }
+
+ if err := os.Remove(address); err != nil && !errors.Is(err, os.ErrNotExist) {
+ glog.Fatalf("removing existing socket: %v", err)
+ }
+
+ listener, err := net.Listen("unix", address)
+ if err != nil {
+ glog.Fatalf("failed to listen on %s: %v", address, err)
+ }
+ defer func() {
+ _ = listener.Close()
+ _ = os.Remove(address)
+ }()
+
+ manager := mountmanager.NewManager(mountmanager.Config{WeedBinary: *weedBinary})
+
+ mux := http.NewServeMux()
+ mux.HandleFunc("/mount", func(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodPost {
+ writeError(w, http.StatusMethodNotAllowed, "method not allowed")
+ return
+ }
+
+ var req mountmanager.MountRequest
+ if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
+ writeError(w, http.StatusBadRequest, "invalid request: "+err.Error())
+ return
+ }
+
+ resp, err := manager.Mount(&req)
+ if err != nil {
+ writeError(w, http.StatusBadRequest, err.Error())
+ return
+ }
+
+ writeJSON(w, http.StatusOK, resp)
+ })
+
+ mux.HandleFunc("/unmount", func(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodPost {
+ writeError(w, http.StatusMethodNotAllowed, "method not allowed")
+ return
+ }
+
+ var req mountmanager.UnmountRequest
+ if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
+ writeError(w, http.StatusBadRequest, "invalid request: "+err.Error())
+ return
+ }
+
+ resp, err := manager.Unmount(&req)
+ if err != nil {
+ writeError(w, http.StatusBadRequest, err.Error())
+ return
+ }
+
+ writeJSON(w, http.StatusOK, resp)
+ })
+
+ mux.HandleFunc("/configure", func(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodPost {
+ writeError(w, http.StatusMethodNotAllowed, "method not allowed")
+ return
+ }
+
+ var req mountmanager.ConfigureRequest
+ if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
+ writeError(w, http.StatusBadRequest, "invalid request: "+err.Error())
+ return
+ }
+
+ resp, err := manager.Configure(&req)
+ if err != nil {
+ writeError(w, http.StatusBadRequest, err.Error())
+ return
+ }
+
+ writeJSON(w, http.StatusOK, resp)
+ })
+
+ mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(http.StatusOK)
+ _, _ = w.Write([]byte("ok"))
+ })
+
+ server := &http.Server{Handler: mux}
+
+ ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
+ defer stop()
+
+ go func() {
+ if err := server.Serve(listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
+ glog.Fatalf("server error: %v", err)
+ }
+ }()
+
+ glog.Infof("mount service listening on %s", *endpoint)
+
+ <-ctx.Done()
+
+ shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if err := server.Shutdown(shutdownCtx); err != nil {
+ glog.Errorf("server shutdown error: %v", err)
+ }
+
+ glog.Infof("mount service stopped")
+}
+
+func writeJSON(w http.ResponseWriter, status int, data interface{}) {
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(status)
+ if err := json.NewEncoder(w).Encode(data); err != nil {
+ glog.Errorf("writing response failed: %v", err)
+ }
+}
+
+func writeError(w http.ResponseWriter, status int, message string) {
+ writeJSON(w, status, mountmanager.ErrorResponse{Error: message})
+}
diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go
index 4e09a93..d162369 100644
--- a/pkg/driver/driver.go
+++ b/pkg/driver/driver.go
@@ -27,7 +27,8 @@ type SeaweedFsDriver struct {
nodeID string
version string
- endpoint string
+ endpoint string
+ mountEndpoint string
vcap []*csi.VolumeCapability_AccessMode
cscap []*csi.ControllerServiceCapability
@@ -48,7 +49,7 @@ type SeaweedFsDriver struct {
RunController bool
}
-func NewSeaweedFsDriver(name, filer, nodeID, endpoint string, enableAttacher bool) *SeaweedFsDriver {
+func NewSeaweedFsDriver(name, filer, nodeID, endpoint, mountEndpoint string, enableAttacher bool) *SeaweedFsDriver {
glog.Infof("Driver: %v version: %v", name, version)
@@ -56,6 +57,7 @@ func NewSeaweedFsDriver(name, filer, nodeID, endpoint string, enableAttacher boo
n := &SeaweedFsDriver{
endpoint: endpoint,
+ mountEndpoint: mountEndpoint,
nodeID: nodeID,
name: name,
version: version,
diff --git a/pkg/driver/mounter.go b/pkg/driver/mounter.go
index 756f671..6beee55 100644
--- a/pkg/driver/mounter.go
+++ b/pkg/driver/mounter.go
@@ -1,24 +1,12 @@
package driver
import (
- "context"
"fmt"
- "os"
- "syscall"
- "time"
-
- "os/exec"
+ "github.com/seaweedfs/seaweedfs-csi-driver/pkg/mountmanager"
"github.com/seaweedfs/seaweedfs/weed/glog"
- "k8s.io/mount-utils"
)
-// Config holds values to configure the driver
-type Config struct {
- // Region string
- Filer string
-}
-
type Unmounter interface {
Unmount() error
}
@@ -27,107 +15,81 @@ type Mounter interface {
Mount(target string) (Unmounter, error)
}
-type fuseUnmounter struct {
- path string
- cmd *exec.Cmd
+type mountServiceMounter struct {
+ driver *SeaweedFsDriver
+ volumeID string
+ readOnly bool
+ volContext map[string]string
+ client *mountmanager.Client
+}
- finished chan struct{}
+type mountServiceUnmounter struct {
+ client *mountmanager.Client
+ volumeID string
}
func newMounter(volumeID string, readOnly bool, driver *SeaweedFsDriver, volContext map[string]string) (Mounter, error) {
- path, ok := volContext["path"]
- if !ok {
- path = fmt.Sprintf("/buckets/%s", volumeID)
+ client, err := mountmanager.NewClient(driver.mountEndpoint)
+ if err != nil {
+ return nil, err
}
- collection, ok := volContext["collection"]
- if !ok {
- collection = volumeID
+ contextCopy := make(map[string]string, len(volContext))
+ for k, v := range volContext {
+ contextCopy[k] = v
}
- return newSeaweedFsMounter(volumeID, path, collection, readOnly, driver, volContext)
+ return &mountServiceMounter{
+ driver: driver,
+ volumeID: volumeID,
+ readOnly: readOnly,
+ volContext: contextCopy,
+ client: client,
+ }, nil
}
-func fuseMount(path string, command string, args []string) (Unmounter, error) {
- cmd := exec.Command(command, args...)
- glog.V(0).Infof("Mounting fuse with command: %s and args: %s", command, args)
-
- // log fuse process messages - we need an easy way to investigate crashes in case it happens
- cmd.Stderr = os.Stderr
- cmd.Stdout = os.Stdout
-
- err := cmd.Start()
- if err != nil {
- glog.Errorf("running weed mount: %v", err)
- return nil, fmt.Errorf("error fuseMount command: %s\nargs: %s\nerror: %v", command, args, err)
+func (m *mountServiceMounter) Mount(target string) (Unmounter, error) {
+ if target == "" {
+ return nil, fmt.Errorf("target path is required")
}
- fu := &fuseUnmounter{
- path: path,
- cmd: cmd,
-
- finished: make(chan struct{}),
+ filers := make([]string, len(m.driver.filers))
+ for i, address := range m.driver.filers {
+ filers[i] = string(address)
}
- // avoid zombie processes
- go func() {
- if err := cmd.Wait(); err != nil {
- glog.Errorf("weed mount exit, pid: %d, path: %v, error: %v", cmd.Process.Pid, path, err)
- } else {
- glog.Infof("weed mount exit, pid: %d, path: %v", cmd.Process.Pid, path)
- }
-
- // make sure we'll have no stale mounts
- time.Sleep(time.Millisecond * 100)
- _ = mountutil.Unmount(path)
-
- close(fu.finished)
- }()
-
- if err = waitForMount(path, 10*time.Second); err != nil {
- glog.Errorf("weed mount timeout, pid: %d, path: %v", cmd.Process.Pid, path)
+ req := &mountmanager.MountRequest{
+ VolumeID: m.volumeID,
+ TargetPath: target,
+ ReadOnly: m.readOnly,
+ Filers: filers,
+ CacheDir: m.driver.CacheDir,
+ CacheCapacityMB: m.driver.CacheCapacityMB,
+ ConcurrentWriters: m.driver.ConcurrentWriters,
+ UidMap: m.driver.UidMap,
+ GidMap: m.driver.GidMap,
+ DataCenter: m.driver.DataCenter,
+ DataLocality: m.driver.DataLocality.String(),
+ VolumeContext: m.volContext,
+ }
- _ = fu.finish(time.Second * 10)
+ resp, err := m.client.Mount(req)
+ if err != nil {
return nil, err
- } else {
- return fu, nil
}
-}
-func (fu *fuseUnmounter) finish(timeout time.Duration) error {
- // ignore error, just inform we want process to exit
- // SIGHUP is used to reload weed config - we need to use SIGTERM
- _ = fu.cmd.Process.Signal(syscall.SIGTERM)
-
- if err := fu.waitFinished(timeout); err != nil {
- glog.Errorf("weed mount terminate timeout, pid: %d, path: %v", fu.cmd.Process.Pid, fu.path)
- _ = fu.cmd.Process.Kill()
- if err = fu.waitFinished(time.Second * 1); err != nil {
- glog.Errorf("weed mount kill timeout, pid: %d, path: %v", fu.cmd.Process.Pid, fu.path)
- return err
- }
+ expectedSocket := mountmanager.LocalSocketPath(m.volumeID)
+ if resp.LocalSocket != "" && resp.LocalSocket != expectedSocket {
+ glog.Warningf("mount service returned socket %s for volume %s (expected %s)", resp.LocalSocket, m.volumeID, expectedSocket)
}
- return nil
+ return &mountServiceUnmounter{
+ client: m.client,
+ volumeID: m.volumeID,
+ }, nil
}
-func (fu *fuseUnmounter) waitFinished(timeout time.Duration) error {
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- select {
- case <-ctx.Done():
- return context.DeadlineExceeded
- case <-fu.finished:
- return nil
- }
-}
-
-func (fu *fuseUnmounter) Unmount() error {
- if ok, err := mountutil.IsMountPoint(fu.path); ok || mount.IsCorruptedMnt(err) {
- if err := mountutil.Unmount(fu.path); err != nil {
- return err
- }
- }
-
- return fu.finish(time.Second * 5)
+func (u *mountServiceUnmounter) Unmount() error {
+ _, err := u.client.Unmount(&mountmanager.UnmountRequest{VolumeID: u.volumeID})
+ return err
}
diff --git a/pkg/driver/mounter_seaweedfs.go b/pkg/driver/mounter_seaweedfs.go
deleted file mode 100644
index 56ba98b..0000000
--- a/pkg/driver/mounter_seaweedfs.go
+++ /dev/null
@@ -1,188 +0,0 @@
-package driver
-
-import (
- "fmt"
- "os"
- "path/filepath"
- "strings"
-
- "github.com/seaweedfs/seaweedfs-csi-driver/pkg/datalocality"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/util"
-)
-
-// Implements Mounter
-type seaweedFsMounter struct {
- volumeID string
- path string
- collection string
- readOnly bool
- driver *SeaweedFsDriver
- volContext map[string]string
-}
-
-type seaweedFsUnmounter struct {
- unmounter Unmounter
- cacheDir string
-}
-
-const (
- seaweedFsCmd = "weed"
-)
-
-func newSeaweedFsMounter(volumeID string, path string, collection string, readOnly bool, driver *SeaweedFsDriver, volContext map[string]string) (Mounter, error) {
- return &seaweedFsMounter{
- volumeID: volumeID,
- path: path,
- collection: collection,
- readOnly: readOnly,
- driver: driver,
- volContext: volContext,
- }, nil
-}
-
-func (seaweedFs *seaweedFsMounter) Mount(target string) (Unmounter, error) {
- glog.V(0).Infof("mounting %v %s to %s", seaweedFs.driver.filers, seaweedFs.path, target)
-
- var filers []string
- for _, address := range seaweedFs.driver.filers {
- filers = append(filers, string(address))
- }
-
- // CacheDirForRead should be always defined - we use temp dir in case it is not defined
- // we need to use predictable cache path, because we need to clean it up on unstage
- cacheDir := filepath.Join(seaweedFs.driver.CacheDir, seaweedFs.volumeID)
-
- // Final args
- args := []string{
- "-logtostderr=true",
- "mount",
- "-dirAutoCreate=true",
- "-umask=000",
- fmt.Sprintf("-dir=%s", target),
- fmt.Sprintf("-localSocket=%s", GetLocalSocket(seaweedFs.volumeID)),
- fmt.Sprintf("-cacheDir=%s", cacheDir),
- }
-
- if seaweedFs.readOnly {
- args = append(args, "-readOnly")
- }
-
- // Values for override-able args
- // Whitelist for merging with volContext
- argsMap := map[string]string{
- "collection": seaweedFs.collection,
- "filer": strings.Join(filers, ","),
- "filer.path": seaweedFs.path,
- "cacheCapacityMB": fmt.Sprint(seaweedFs.driver.CacheCapacityMB),
- "concurrentWriters": fmt.Sprint(seaweedFs.driver.ConcurrentWriters),
- "map.uid": seaweedFs.driver.UidMap,
- "map.gid": seaweedFs.driver.GidMap,
- "disk": "",
- "dataCenter": "",
- "replication": "",
- "ttl": "",
- "chunkSizeLimitMB": "",
- "volumeServerAccess": "",
- "readRetryTime": "",
- }
-
- // Handle DataLocality
- dataLocality := seaweedFs.driver.DataLocality
- // Try to override when set in context
- if dataLocalityStr, ok := seaweedFs.volContext["dataLocality"]; ok {
- // Convert to enum
- dataLocalityRes, ok := datalocality.FromString(dataLocalityStr)
- if !ok {
- glog.Warning("volumeContext 'dataLocality' invalid")
- } else {
- dataLocality = dataLocalityRes
- }
- }
- if err := CheckDataLocality(&dataLocality, &seaweedFs.driver.DataCenter); err != nil {
- return nil, err
- }
- // Settings based on type
- switch dataLocality {
- case datalocality.Write_preferLocalDc:
- argsMap["dataCenter"] = seaweedFs.driver.DataCenter
- }
-
- // volContext-parameter -> mount-arg
- parameterArgMap := map[string]string{
- "uidMap": "map.uid",
- "gidMap": "map.gid",
- "filerPath": "filer.path",
- // volumeContext has "diskType", but mount-option is "disk", converting for backwards compatability
- "diskType": "disk",
- }
-
- // Explicitly ignored volContext args e.g. handled somewhere else
- ignoreArgs := []string{
- "dataLocality",
- }
-
- // Merge volContext into argsMap with key-mapping
- for arg, value := range seaweedFs.volContext {
- if in_arr(ignoreArgs, arg) {
- continue
- }
-
- // Check if key-mapping exists
- newArg, ok := parameterArgMap[arg]
- if ok {
- arg = newArg
- }
-
- // Check if arg can be applied
- if _, ok := argsMap[arg]; !ok {
- glog.Warningf("VolumeContext '%s' ignored", arg)
- continue
- }
-
- // Write to args-map
- argsMap[arg] = value
- }
-
- // Convert Args-Map to args
- for arg, value := range argsMap {
- if value != "" { // ignore empty values
- args = append(args, fmt.Sprintf("-%s=%s", arg, value))
- }
- }
-
- u, err := fuseMount(target, seaweedFsCmd, args)
- if err != nil {
- glog.Errorf("mount %v %s to %s: %s", seaweedFs.driver.filers, seaweedFs.path, target, err)
- }
-
- return &seaweedFsUnmounter{unmounter: u, cacheDir: cacheDir}, err
-}
-
-func (su *seaweedFsUnmounter) Unmount() error {
- err := su.unmounter.Unmount()
- err2 := os.RemoveAll(su.cacheDir)
- if err2 != nil {
- glog.Warningf("error removing cache from: %s, err: %v", su.cacheDir, err2)
- }
- return err
-}
-
-func GetLocalSocket(volumeID string) string {
- montDirHash := util.HashToInt32([]byte(volumeID))
- if montDirHash < 0 {
- montDirHash = -montDirHash
- }
-
- socket := fmt.Sprintf("/tmp/seaweedfs-mount-%d.sock", montDirHash)
- return socket
-}
-
-func in_arr(arr []string, val string) bool {
- for _, v := range arr {
- if val == v {
- return true
- }
- }
- return false
-}
diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go
index 21e68bf..9371d09 100644
--- a/pkg/driver/nodeserver.go
+++ b/pkg/driver/nodeserver.go
@@ -185,11 +185,12 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
// Note: The returned Volume won't have an unmounter, so Unstage will need special handling.
func (ns *NodeServer) rebuildVolumeFromStaging(volumeID string, stagingPath string) *Volume {
return &Volume{
- VolumeId: volumeID,
- StagedPath: stagingPath,
- localSocket: GetLocalSocket(volumeID),
+ VolumeId: volumeID,
+ StagedPath: stagingPath,
+ driver: ns.Driver,
// mounter and unmounter are nil - this is intentional
// The FUSE process is already running, we just need to track the volume
+ // The mount service will have the mount tracked if it's still alive
}
}
@@ -344,17 +345,7 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
}
func (ns *NodeServer) NodeCleanup() {
- ns.volumes.Range(func(_, vol any) bool {
- v := vol.(*Volume)
- if len(v.StagedPath) > 0 {
- glog.Infof("cleaning up volume %s at %s", v.VolumeId, v.StagedPath)
- err := v.Unstage(v.StagedPath)
- if err != nil {
- glog.Warningf("error cleaning up volume %s at %s, err: %v", v.VolumeId, v.StagedPath, err)
- }
- }
- return true
- })
+ glog.Infof("node cleanup skipped; mount service retains mounts across restarts")
}
func (ns *NodeServer) getVolumeMutex(volumeID string) *sync.Mutex {
@@ -373,7 +364,7 @@ func (ns *NodeServer) stageNewVolume(volumeID, stagingTargetPath string, volCont
return nil, err
}
- volume := NewVolume(volumeID, mounter)
+ volume := NewVolume(volumeID, mounter, ns.Driver)
if err := volume.Stage(stagingTargetPath); err != nil {
return nil, err
}
diff --git a/pkg/driver/volume.go b/pkg/driver/volume.go
index 6cc777b..080595e 100644
--- a/pkg/driver/volume.go
+++ b/pkg/driver/volume.go
@@ -1,14 +1,10 @@
package driver
import (
- "context"
- "fmt"
"os"
+ "github.com/seaweedfs/seaweedfs-csi-driver/pkg/mountmanager"
"github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials/insecure"
"k8s.io/mount-utils"
)
@@ -18,16 +14,14 @@ type Volume struct {
mounter Mounter
unmounter Unmounter
-
- // unix socket used to manage volume
- localSocket string
+ driver *SeaweedFsDriver
}
-func NewVolume(volumeID string, mounter Mounter) *Volume {
+func NewVolume(volumeID string, mounter Mounter, driver *SeaweedFsDriver) *Volume {
return &Volume{
- VolumeId: volumeID,
- mounter: mounter,
- localSocket: GetLocalSocket(volumeID),
+ VolumeId: volumeID,
+ mounter: mounter,
+ driver: driver,
}
}
@@ -81,22 +75,18 @@ func (vol *Volume) Publish(stagingTargetPath string, targetPath string, readOnly
}
func (vol *Volume) Quota(sizeByte int64) error {
- target := fmt.Sprintf("passthrough:///unix://%s", vol.localSocket)
- dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
-
- clientConn, err := grpc.Dial(target, dialOption)
+ client, err := mountmanager.NewClient(vol.driver.mountEndpoint)
if err != nil {
return err
}
- defer clientConn.Close()
// We can't create PV of zero size, so we're using quota of 1 byte to define no quota.
if sizeByte == 1 {
sizeByte = 0
}
- client := mount_pb.NewSeaweedMountClient(clientConn)
- _, err = client.Configure(context.Background(), &mount_pb.ConfigureRequest{
+ _, err = client.Configure(&mountmanager.ConfigureRequest{
+ VolumeID: vol.VolumeId,
CollectionCapacity: sizeByte,
})
return err
diff --git a/pkg/mountmanager/client.go b/pkg/mountmanager/client.go
new file mode 100644
index 0000000..8a53d00
--- /dev/null
+++ b/pkg/mountmanager/client.go
@@ -0,0 +1,110 @@
+package mountmanager
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "net/http"
+ "time"
+)
+
+// Client talks to the mount service over a Unix domain socket.
+type Client struct {
+ httpClient *http.Client
+ baseURL string
+}
+
+// NewClient builds a new Client for the given endpoint.
+func NewClient(endpoint string) (*Client, error) {
+ scheme, address, err := ParseEndpoint(endpoint)
+ if err != nil {
+ return nil, err
+ }
+ if scheme != "unix" {
+ return nil, fmt.Errorf("unsupported endpoint scheme: %s", scheme)
+ }
+
+ dialer := &net.Dialer{}
+ transport := &http.Transport{
+ DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
+ return dialer.DialContext(ctx, "unix", address)
+ },
+ }
+
+ return &Client{
+ httpClient: &http.Client{
+ Timeout: 30 * time.Second,
+ Transport: transport,
+ },
+ baseURL: "http://unix",
+ }, nil
+}
+
+// Mount mounts a volume using the mount service.
+func (c *Client) Mount(req *MountRequest) (*MountResponse, error) {
+ var resp MountResponse
+ if err := c.doPost("/mount", req, &resp); err != nil {
+ return nil, err
+ }
+ return &resp, nil
+}
+
+// Unmount unmounts a volume using the mount service.
+func (c *Client) Unmount(req *UnmountRequest) (*UnmountResponse, error) {
+ var resp UnmountResponse
+ if err := c.doPost("/unmount", req, &resp); err != nil {
+ return nil, err
+ }
+ return &resp, nil
+}
+
+// Configure updates runtime options such as quota for an existing mount.
+func (c *Client) Configure(req *ConfigureRequest) (*ConfigureResponse, error) {
+ var resp ConfigureResponse
+ if err := c.doPost("/configure", req, &resp); err != nil {
+ return nil, err
+ }
+ return &resp, nil
+}
+
+func (c *Client) doPost(path string, payload any, out any) error {
+ body := &bytes.Buffer{}
+ if err := json.NewEncoder(body).Encode(payload); err != nil {
+ return fmt.Errorf("encode request: %w", err)
+ }
+
+ req, err := http.NewRequest(http.MethodPost, c.baseURL+path, body)
+ if err != nil {
+ return fmt.Errorf("build request: %w", err)
+ }
+ req.Header.Set("Content-Type", "application/json")
+
+ resp, err := c.httpClient.Do(req)
+ if err != nil {
+ return fmt.Errorf("call mount service: %w", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode >= 400 {
+ var errResp ErrorResponse
+ if err := json.NewDecoder(resp.Body).Decode(&errResp); err == nil && errResp.Error != "" {
+ return errors.New(errResp.Error)
+ }
+ data, _ := io.ReadAll(resp.Body)
+ return fmt.Errorf("mount service error: %s (%s)", resp.Status, string(data))
+ }
+
+ if out == nil {
+ _, _ = io.Copy(io.Discard, resp.Body)
+ return nil
+ }
+
+ if err := json.NewDecoder(resp.Body).Decode(out); err != nil {
+ return fmt.Errorf("decode response: %w", err)
+ }
+ return nil
+}
diff --git a/pkg/mountmanager/endpoint.go b/pkg/mountmanager/endpoint.go
new file mode 100644
index 0000000..1424536
--- /dev/null
+++ b/pkg/mountmanager/endpoint.go
@@ -0,0 +1,15 @@
+package mountmanager
+
+import (
+ "fmt"
+ "strings"
+)
+
+// ParseEndpoint splits an endpoint string like "unix:///path" into scheme and address.
+func ParseEndpoint(endpoint string) (scheme, address string, err error) {
+ parts := strings.SplitN(endpoint, "://", 2)
+ if len(parts) != 2 || parts[1] == "" {
+ return "", "", fmt.Errorf("invalid endpoint: %s", endpoint)
+ }
+ return strings.ToLower(parts[0]), parts[1], nil
+}
diff --git a/pkg/mountmanager/keymutex.go b/pkg/mountmanager/keymutex.go
new file mode 100644
index 0000000..0c64dcc
--- /dev/null
+++ b/pkg/mountmanager/keymutex.go
@@ -0,0 +1,21 @@
+package mountmanager
+
+import "sync"
+
+// keyMutex provides a lock per key to serialize operations per volume.
+type keyMutex struct {
+ mutexes sync.Map
+}
+
+func newKeyMutex() *keyMutex {
+ return &keyMutex{}
+}
+
+func (km *keyMutex) get(key string) *sync.Mutex {
+ m, _ := km.mutexes.LoadOrStore(key, &sync.Mutex{})
+ return m.(*sync.Mutex)
+}
+
+func (km *keyMutex) delete(key string) {
+ km.mutexes.Delete(key)
+}
diff --git a/pkg/mountmanager/manager.go b/pkg/mountmanager/manager.go
new file mode 100644
index 0000000..0553af5
--- /dev/null
+++ b/pkg/mountmanager/manager.go
@@ -0,0 +1,490 @@
+package mountmanager
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strconv"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs-csi-driver/pkg/datalocality"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+ "k8s.io/mount-utils"
+)
+
+var kubeMounter = mount.New("")
+
+// Manager owns weed mount processes and exposes helpers to start and stop them.
+type Manager struct {
+ weedBinary string
+
+ mu sync.Mutex
+ mounts map[string]*mountEntry
+ locks *keyMutex
+}
+
+// Config configures a Manager instance.
+type Config struct {
+ WeedBinary string
+}
+
+// NewManager returns a Manager ready to accept mount requests.
+func NewManager(cfg Config) *Manager {
+ binary := cfg.WeedBinary
+ if binary == "" {
+ binary = DefaultWeedBinary
+ }
+ return &Manager{
+ weedBinary: binary,
+ mounts: make(map[string]*mountEntry),
+ locks: newKeyMutex(),
+ }
+}
+
+// Mount starts a weed mount process using the provided request.
+func (m *Manager) Mount(req *MountRequest) (*MountResponse, error) {
+ if req == nil {
+ return nil, errors.New("mount request is nil")
+ }
+ if err := validateMountRequest(req); err != nil {
+ return nil, err
+ }
+
+ lock := m.locks.get(req.VolumeID)
+ lock.Lock()
+ defer lock.Unlock()
+
+ if entry := m.getMount(req.VolumeID); entry != nil {
+ if entry.targetPath == req.TargetPath {
+ glog.V(1).Infof("volume %s already mounted at %s", req.VolumeID, req.TargetPath)
+ return &MountResponse{LocalSocket: entry.localSocket}, nil
+ }
+ return nil, fmt.Errorf("volume %s already mounted at %s", req.VolumeID, entry.targetPath)
+ }
+
+ entry, err := m.startMount(req)
+ if err != nil {
+ return nil, err
+ }
+
+ m.mu.Lock()
+ m.mounts[req.VolumeID] = entry
+ m.mu.Unlock()
+
+ go m.watchMount(req.VolumeID, entry)
+
+ return &MountResponse{LocalSocket: entry.localSocket}, nil
+}
+
+// Unmount terminates the weed mount process associated with the provided request.
+func (m *Manager) Unmount(req *UnmountRequest) (*UnmountResponse, error) {
+ if req == nil {
+ return nil, errors.New("unmount request is nil")
+ }
+ if req.VolumeID == "" {
+ return nil, errors.New("volumeId is required")
+ }
+
+ lock := m.locks.get(req.VolumeID)
+ lock.Lock()
+ defer lock.Unlock()
+
+ entry := m.removeMount(req.VolumeID)
+ if entry == nil {
+ glog.V(1).Infof("volume %s not mounted", req.VolumeID)
+ return &UnmountResponse{}, nil
+ }
+
+ defer os.RemoveAll(entry.cacheDir)
+
+ if err := entry.process.stop(); err != nil {
+ return nil, err
+ }
+
+ return &UnmountResponse{}, nil
+}
+
+func (m *Manager) getMount(volumeID string) *mountEntry {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return m.mounts[volumeID]
+}
+
+func (m *Manager) removeMount(volumeID string) *mountEntry {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ entry := m.mounts[volumeID]
+ delete(m.mounts, volumeID)
+ m.locks.delete(volumeID)
+ return entry
+}
+
+func (m *Manager) watchMount(volumeID string, entry *mountEntry) {
+ <-entry.process.done
+ m.mu.Lock()
+ current, ok := m.mounts[volumeID]
+ if ok && current == entry {
+ delete(m.mounts, volumeID)
+ }
+ m.mu.Unlock()
+ os.RemoveAll(entry.cacheDir)
+}
+
+func (m *Manager) startMount(req *MountRequest) (*mountEntry, error) {
+ targetPath := req.TargetPath
+ if err := ensureTargetClean(targetPath); err != nil {
+ return nil, err
+ }
+
+ cacheBase := req.CacheDir
+ if cacheBase == "" {
+ cacheBase = os.TempDir()
+ }
+ cacheDir := filepath.Join(cacheBase, req.VolumeID)
+ if err := os.MkdirAll(cacheDir, 0750); err != nil {
+ return nil, fmt.Errorf("creating cache dir: %w", err)
+ }
+
+ localSocket := LocalSocketPath(req.VolumeID)
+ args, err := buildMountArgs(req, targetPath, cacheDir, localSocket)
+ if err != nil {
+ return nil, err
+ }
+
+ process, err := startWeedMountProcess(m.weedBinary, args, targetPath)
+ if err != nil {
+ return nil, err
+ }
+
+ return &mountEntry{
+ volumeID: req.VolumeID,
+ targetPath: targetPath,
+ cacheDir: cacheDir,
+ localSocket: localSocket,
+ process: process,
+ }, nil
+}
+
+func ensureTargetClean(targetPath string) error {
+ isMount, err := kubeMounter.IsMountPoint(targetPath)
+ if err != nil {
+ if os.IsNotExist(err) {
+ return os.MkdirAll(targetPath, 0750)
+ }
+ if mount.IsCorruptedMnt(err) {
+ if err := kubeMounter.Unmount(targetPath); err != nil {
+ return err
+ }
+ return ensureTargetClean(targetPath)
+ }
+ return err
+ }
+ if isMount {
+ if err := kubeMounter.Unmount(targetPath); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func validateMountRequest(req *MountRequest) error {
+ if req.VolumeID == "" {
+ return errors.New("volumeId is required")
+ }
+ if req.TargetPath == "" {
+ return errors.New("targetPath is required")
+ }
+ if len(req.Filers) == 0 {
+ return errors.New("at least one filer is required")
+ }
+ return nil
+}
+
+func buildMountArgs(req *MountRequest, targetPath, cacheDir, localSocket string) ([]string, error) {
+ volumeContext := req.VolumeContext
+ if volumeContext == nil {
+ volumeContext = map[string]string{}
+ }
+
+ path := volumeContext["path"]
+ if path == "" {
+ path = fmt.Sprintf("/buckets/%s", req.VolumeID)
+ }
+
+ collection := volumeContext["collection"]
+ if collection == "" {
+ collection = req.VolumeID
+ }
+
+ args := []string{
+ "-logtostderr=true",
+ "mount",
+ "-dirAutoCreate=true",
+ "-umask=000",
+ fmt.Sprintf("-dir=%s", targetPath),
+ fmt.Sprintf("-localSocket=%s", localSocket),
+ fmt.Sprintf("-cacheDir=%s", cacheDir),
+ }
+
+ if req.ReadOnly {
+ args = append(args, "-readOnly")
+ }
+
+ argsMap := map[string]string{
+ "collection": collection,
+ "filer": strings.Join(req.Filers, ","),
+ "filer.path": path,
+ "cacheCapacityMB": strconv.Itoa(req.CacheCapacityMB),
+ "concurrentWriters": strconv.Itoa(req.ConcurrentWriters),
+ "map.uid": req.UidMap,
+ "map.gid": req.GidMap,
+ "disk": "",
+ "dataCenter": "",
+ "replication": "",
+ "ttl": "",
+ "chunkSizeLimitMB": "",
+ "volumeServerAccess": "",
+ "readRetryTime": "",
+ }
+
+ dataLocality := datalocality.None
+ if req.DataLocality != "" {
+ if dl, ok := datalocality.FromString(req.DataLocality); ok {
+ dataLocality = dl
+ } else {
+ return nil, fmt.Errorf("invalid dataLocality: %s", req.DataLocality)
+ }
+ }
+
+ if contextLocality, ok := volumeContext["dataLocality"]; ok {
+ if dl, ok := datalocality.FromString(contextLocality); ok {
+ dataLocality = dl
+ } else {
+ return nil, fmt.Errorf("invalid volumeContext dataLocality: %s", contextLocality)
+ }
+ }
+
+ if err := checkDataLocality(&dataLocality, req.DataCenter); err != nil {
+ return nil, err
+ }
+
+ switch dataLocality {
+ case datalocality.Write_preferLocalDc:
+ argsMap["dataCenter"] = req.DataCenter
+ }
+
+ parameterArgMap := map[string]string{
+ "uidMap": "map.uid",
+ "gidMap": "map.gid",
+ "filerPath": "filer.path",
+ "diskType": "disk",
+ }
+
+ ignoredArgs := map[string]struct{}{"dataLocality": {}}
+
+ for key, value := range volumeContext {
+ if _, ignored := ignoredArgs[key]; ignored {
+ continue
+ }
+ if mapped, ok := parameterArgMap[key]; ok {
+ key = mapped
+ }
+ if _, ok := argsMap[key]; !ok {
+ glog.Warningf("VolumeContext '%s' ignored", key)
+ continue
+ }
+ if value != "" {
+ argsMap[key] = value
+ }
+ }
+
+ for key, value := range argsMap {
+ if value == "" {
+ continue
+ }
+ args = append(args, fmt.Sprintf("-%s=%s", key, value))
+ }
+
+ return args, nil
+}
+
+func checkDataLocality(dataLocality *datalocality.DataLocality, dataCenter string) error {
+ if *dataLocality != datalocality.None && dataCenter == "" {
+ return fmt.Errorf("dataLocality set, but dataCenter is empty")
+ }
+ return nil
+}
+
+// Configure updates mount-level settings, e.g. quota. Intended to be invoked by a trusted caller.
+func (m *Manager) Configure(req *ConfigureRequest) (*ConfigureResponse, error) {
+ if req == nil {
+ return nil, errors.New("configure request is nil")
+ }
+
+ lock := m.locks.get(req.VolumeID)
+ lock.Lock()
+ defer lock.Unlock()
+
+ entry := m.getMount(req.VolumeID)
+ if entry == nil {
+ m.locks.delete(req.VolumeID)
+ return nil, fmt.Errorf("volume %s not mounted", req.VolumeID)
+ }
+
+ client, err := newMountProcessClient(entry.localSocket)
+ if err != nil {
+ return nil, err
+ }
+ defer client.Close()
+
+ if err := client.Configure(req.CollectionCapacity); err != nil {
+ return nil, err
+ }
+
+ return &ConfigureResponse{}, nil
+}
+
+type mountEntry struct {
+ volumeID string
+ targetPath string
+ cacheDir string
+ localSocket string
+ process *weedMountProcess
+}
+
+type weedMountProcess struct {
+ cmd *exec.Cmd
+ target string
+ done chan struct{}
+}
+
+func startWeedMountProcess(command string, args []string, target string) (*weedMountProcess, error) {
+ cmd := exec.Command(command, args...)
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = os.Stderr
+
+ glog.V(0).Infof("Starting weed mount: %s %s", command, strings.Join(args, " "))
+
+ if err := cmd.Start(); err != nil {
+ return nil, fmt.Errorf("starting weed mount: %w", err)
+ }
+
+ process := &weedMountProcess{
+ cmd: cmd,
+ target: target,
+ done: make(chan struct{}),
+ }
+
+ go process.wait()
+
+ if err := waitForMount(target, 10*time.Second); err != nil {
+ _ = process.stop()
+ return nil, err
+ }
+
+ return process, nil
+}
+
+func (p *weedMountProcess) wait() {
+ if err := p.cmd.Wait(); err != nil {
+ glog.Errorf("weed mount exit (pid: %d, target: %s): %v", p.cmd.Process.Pid, p.target, err)
+ } else {
+ glog.Infof("weed mount exit (pid: %d, target: %s)", p.cmd.Process.Pid, p.target)
+ }
+
+ time.Sleep(100 * time.Millisecond)
+ _ = kubeMounter.Unmount(p.target)
+
+ close(p.done)
+}
+
+func (p *weedMountProcess) stop() error {
+ if p.cmd.Process == nil {
+ return nil
+ }
+
+ if err := p.cmd.Process.Signal(syscall.SIGTERM); err != nil && err != os.ErrProcessDone {
+ glog.Warningf("sending SIGTERM to weed mount failed: %v", err)
+ }
+
+ select {
+ case <-p.done:
+ return nil
+ case <-time.After(5 * time.Second):
+ }
+
+ if err := p.cmd.Process.Kill(); err != nil && err != os.ErrProcessDone {
+ glog.Warningf("killing weed mount failed: %v", err)
+ }
+
+ select {
+ case <-p.done:
+ return nil
+ case <-time.After(1 * time.Second):
+ return errors.New("timed out waiting for weed mount to stop")
+ }
+}
+
+func waitForMount(path string, timeout time.Duration) error {
+ var elapsed time.Duration
+ interval := 10 * time.Millisecond
+
+ for {
+ notMount, err := kubeMounter.IsLikelyNotMountPoint(path)
+ if err != nil {
+ return err
+ }
+ if !notMount {
+ return nil
+ }
+
+ time.Sleep(interval)
+ elapsed += interval
+ if elapsed >= timeout {
+ return errors.New("timeout waiting for mount")
+ }
+ }
+}
+
+type mountProcessClient struct {
+ conn *grpc.ClientConn
+ client mount_pb.SeaweedMountClient
+}
+
+func newMountProcessClient(localSocket string) (*mountProcessClient, error) {
+ target := fmt.Sprintf("passthrough:///unix://%s", localSocket)
+ conn, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
+ if err != nil {
+ return nil, fmt.Errorf("dial mount socket %s: %w", localSocket, err)
+ }
+
+ return &mountProcessClient{
+ conn: conn,
+ client: mount_pb.NewSeaweedMountClient(conn),
+ }, nil
+}
+
+func (c *mountProcessClient) Close() error {
+ return c.conn.Close()
+}
+
+func (c *mountProcessClient) Configure(capacity int64) error {
+ if capacity == 1 {
+ capacity = 0
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ _, err := c.client.Configure(ctx, &mount_pb.ConfigureRequest{CollectionCapacity: capacity})
+ return err
+}
diff --git a/pkg/mountmanager/socket.go b/pkg/mountmanager/socket.go
new file mode 100644
index 0000000..bd63146
--- /dev/null
+++ b/pkg/mountmanager/socket.go
@@ -0,0 +1,16 @@
+package mountmanager
+
+import (
+ "fmt"
+
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+// LocalSocketPath returns the unix socket path used to communicate with the weed mount process.
+func LocalSocketPath(volumeID string) string {
+ hash := util.HashToInt32([]byte(volumeID))
+ if hash < 0 {
+ hash = -hash
+ }
+ return fmt.Sprintf("/tmp/seaweedfs-mount-%d.sock", hash)
+}
diff --git a/pkg/mountmanager/types.go b/pkg/mountmanager/types.go
new file mode 100644
index 0000000..37f791f
--- /dev/null
+++ b/pkg/mountmanager/types.go
@@ -0,0 +1,49 @@
+package mountmanager
+
+// MountRequest contains all information needed to start a weed mount process.
+type MountRequest struct {
+ VolumeID string `json:"volumeId"`
+ TargetPath string `json:"targetPath"`
+ ReadOnly bool `json:"readOnly"`
+ Filers []string `json:"filers"`
+ CacheDir string `json:"cacheDir"`
+ CacheCapacityMB int `json:"cacheCapacityMB"`
+ ConcurrentWriters int `json:"concurrentWriters"`
+ UidMap string `json:"uidMap"`
+ GidMap string `json:"gidMap"`
+ DataCenter string `json:"dataCenter"`
+ DataLocality string `json:"dataLocality"`
+ VolumeContext map[string]string `json:"volumeContext"`
+}
+
+// MountResponse is returned after a successful mount request.
+type MountResponse struct {
+ LocalSocket string `json:"localSocket"`
+}
+
+// UnmountRequest contains the information needed to stop a weed mount process.
+type UnmountRequest struct {
+ VolumeID string `json:"volumeId"`
+}
+
+// UnmountResponse is the response of a successful unmount request.
+type UnmountResponse struct{}
+
+// ConfigureRequest adjusts the behaviour of an existing mount.
+type ConfigureRequest struct {
+ VolumeID string `json:"volumeId"`
+ CollectionCapacity int64 `json:"collectionCapacity"`
+}
+
+// ConfigureResponse represents a successful configure call.
+type ConfigureResponse struct{}
+
+// ErrorResponse is returned when the mount service encounters a failure.
+type ErrorResponse struct {
+ Error string `json:"error"`
+}
+
+const (
+ // DefaultWeedBinary is the default executable name used to spawn weed mount processes.
+ DefaultWeedBinary = "weed"
+)