diff options
| author | 泽淼 周 <zhouzemiao@ihuman.com> | 2025-09-26 19:54:44 +0800 |
|---|---|---|
| committer | chrislusf <chris.lu@gmail.com> | 2025-12-05 17:51:28 -0800 |
| commit | bedb15e3e3742a4bd4e26e71e6d9818e9e1474bd (patch) | |
| tree | 7a60daa0bc9cf86204ddeddc94bc7dc748483cb5 | |
| parent | fd2b35494095ccf7b06fb210305406f83ed17998 (diff) | |
| download | seaweedfs-csi-driver-bedb15e3e3742a4bd4e26e71e6d9818e9e1474bd.tar.xz seaweedfs-csi-driver-bedb15e3e3742a4bd4e26e71e6d9818e9e1474bd.zip | |
feat: Separated weed mount lifecycle into a dedicated service and rewired the CSI components to call it.
| -rw-r--r-- | cmd/seaweedfs-csi-driver/main.go | 3 | ||||
| -rw-r--r-- | cmd/seaweedfs-mount/Dockerfile | 20 | ||||
| -rw-r--r-- | cmd/seaweedfs-mount/main.go | 153 | ||||
| -rw-r--r-- | pkg/driver/driver.go | 6 | ||||
| -rw-r--r-- | pkg/driver/mounter.go | 150 | ||||
| -rw-r--r-- | pkg/driver/mounter_seaweedfs.go | 188 | ||||
| -rw-r--r-- | pkg/driver/nodeserver.go | 21 | ||||
| -rw-r--r-- | pkg/driver/volume.go | 28 | ||||
| -rw-r--r-- | pkg/mountmanager/client.go | 110 | ||||
| -rw-r--r-- | pkg/mountmanager/endpoint.go | 15 | ||||
| -rw-r--r-- | pkg/mountmanager/keymutex.go | 21 | ||||
| -rw-r--r-- | pkg/mountmanager/manager.go | 490 | ||||
| -rw-r--r-- | pkg/mountmanager/socket.go | 16 | ||||
| -rw-r--r-- | pkg/mountmanager/types.go | 49 |
14 files changed, 951 insertions, 319 deletions
diff --git a/cmd/seaweedfs-csi-driver/main.go b/cmd/seaweedfs-csi-driver/main.go index f749db6..45bb810 100644 --- a/cmd/seaweedfs-csi-driver/main.go +++ b/cmd/seaweedfs-csi-driver/main.go @@ -19,6 +19,7 @@ var ( filer = flag.String("filer", "localhost:8888", "filer server") endpoint = flag.String("endpoint", "unix://tmp/seaweedfs-csi.sock", "CSI endpoint to accept gRPC calls") + mountEndpoint = flag.String("mountEndpoint", "unix:///tmp/seaweedfs-mount.sock", "mount service endpoint") nodeID = flag.String("nodeid", "", "node id") version = flag.Bool("version", false, "Print the version and exit.") concurrentWriters = flag.Int("concurrentWriters", 32, "limit concurrent goroutine writers if not 0") @@ -78,7 +79,7 @@ func main() { glog.Infof("connect to filer %s", *filer) - drv := driver.NewSeaweedFsDriver(*driverName, *filer, *nodeID, *endpoint, *enableAttacher) + drv := driver.NewSeaweedFsDriver(*driverName, *filer, *nodeID, *endpoint, *mountEndpoint, *enableAttacher) drv.RunNode = runNode drv.RunController = runController diff --git a/cmd/seaweedfs-mount/Dockerfile b/cmd/seaweedfs-mount/Dockerfile new file mode 100644 index 0000000..689037b --- /dev/null +++ b/cmd/seaweedfs-mount/Dockerfile @@ -0,0 +1,20 @@ +FROM golang:1.23-alpine AS builder + +RUN apk add git g++ + +RUN mkdir -p /go/src/github.com/seaweedfs/ +RUN git clone https://github.com/seaweedfs/seaweedfs /go/src/github.com/seaweedfs/seaweedfs +RUN cd /go/src/github.com/seaweedfs/seaweedfs/weed && go install + +RUN mkdir -p /go/src/github.com/zemul/ +RUN git clone https://github.com/zemul/seaweedfs-csi-driver /go/src/github.com/zemul/seaweedfs-csi-driver +RUN cd /go/src/github.com/zemul/seaweedfs-csi-driver && \ + go build -ldflags="-s -w" -o /seaweedfs-mount ./cmd/seaweedfs-mount/main.go + +FROM alpine AS final +RUN apk add fuse +COPY --from=builder /go/bin/weed /usr/bin/ +COPY --from=builder /seaweedfs-mount / + +RUN chmod +x /seaweedfs-mount +ENTRYPOINT ["/seaweedfs-mount"]
\ No newline at end of file diff --git a/cmd/seaweedfs-mount/main.go b/cmd/seaweedfs-mount/main.go new file mode 100644 index 0000000..174b530 --- /dev/null +++ b/cmd/seaweedfs-mount/main.go @@ -0,0 +1,153 @@ +package main + +import ( + "context" + "encoding/json" + "errors" + "flag" + "net" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/seaweedfs/seaweedfs-csi-driver/pkg/mountmanager" + "github.com/seaweedfs/seaweedfs/weed/glog" +) + +var ( + endpoint = flag.String("endpoint", "unix:///tmp/seaweedfs-mount.sock", "endpoint the mount service listens on") + weedBinary = flag.String("weedBinary", mountmanager.DefaultWeedBinary, "path to the weed binary") +) + +func main() { + flag.Parse() + + scheme, address, err := mountmanager.ParseEndpoint(*endpoint) + if err != nil { + glog.Fatalf("invalid endpoint: %v", err) + } + if scheme != "unix" { + glog.Fatalf("unsupported endpoint scheme: %s", scheme) + } + + if err := os.Remove(address); err != nil && !errors.Is(err, os.ErrNotExist) { + glog.Fatalf("removing existing socket: %v", err) + } + + listener, err := net.Listen("unix", address) + if err != nil { + glog.Fatalf("failed to listen on %s: %v", address, err) + } + defer func() { + _ = listener.Close() + _ = os.Remove(address) + }() + + manager := mountmanager.NewManager(mountmanager.Config{WeedBinary: *weedBinary}) + + mux := http.NewServeMux() + mux.HandleFunc("/mount", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + var req mountmanager.MountRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid request: "+err.Error()) + return + } + + resp, err := manager.Mount(&req) + if err != nil { + writeError(w, http.StatusBadRequest, err.Error()) + return + } + + writeJSON(w, http.StatusOK, resp) + }) + + mux.HandleFunc("/unmount", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + var req mountmanager.UnmountRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid request: "+err.Error()) + return + } + + resp, err := manager.Unmount(&req) + if err != nil { + writeError(w, http.StatusBadRequest, err.Error()) + return + } + + writeJSON(w, http.StatusOK, resp) + }) + + mux.HandleFunc("/configure", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + var req mountmanager.ConfigureRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid request: "+err.Error()) + return + } + + resp, err := manager.Configure(&req) + if err != nil { + writeError(w, http.StatusBadRequest, err.Error()) + return + } + + writeJSON(w, http.StatusOK, resp) + }) + + mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) + }) + + server := &http.Server{Handler: mux} + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + go func() { + if err := server.Serve(listener); err != nil && !errors.Is(err, http.ErrServerClosed) { + glog.Fatalf("server error: %v", err) + } + }() + + glog.Infof("mount service listening on %s", *endpoint) + + <-ctx.Done() + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := server.Shutdown(shutdownCtx); err != nil { + glog.Errorf("server shutdown error: %v", err) + } + + glog.Infof("mount service stopped") +} + +func writeJSON(w http.ResponseWriter, status int, data interface{}) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + if err := json.NewEncoder(w).Encode(data); err != nil { + glog.Errorf("writing response failed: %v", err) + } +} + +func writeError(w http.ResponseWriter, status int, message string) { + writeJSON(w, status, mountmanager.ErrorResponse{Error: message}) +} diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index 4e09a93..d162369 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -27,7 +27,8 @@ type SeaweedFsDriver struct { nodeID string version string - endpoint string + endpoint string + mountEndpoint string vcap []*csi.VolumeCapability_AccessMode cscap []*csi.ControllerServiceCapability @@ -48,7 +49,7 @@ type SeaweedFsDriver struct { RunController bool } -func NewSeaweedFsDriver(name, filer, nodeID, endpoint string, enableAttacher bool) *SeaweedFsDriver { +func NewSeaweedFsDriver(name, filer, nodeID, endpoint, mountEndpoint string, enableAttacher bool) *SeaweedFsDriver { glog.Infof("Driver: %v version: %v", name, version) @@ -56,6 +57,7 @@ func NewSeaweedFsDriver(name, filer, nodeID, endpoint string, enableAttacher boo n := &SeaweedFsDriver{ endpoint: endpoint, + mountEndpoint: mountEndpoint, nodeID: nodeID, name: name, version: version, diff --git a/pkg/driver/mounter.go b/pkg/driver/mounter.go index 756f671..6beee55 100644 --- a/pkg/driver/mounter.go +++ b/pkg/driver/mounter.go @@ -1,24 +1,12 @@ package driver import ( - "context" "fmt" - "os" - "syscall" - "time" - - "os/exec" + "github.com/seaweedfs/seaweedfs-csi-driver/pkg/mountmanager" "github.com/seaweedfs/seaweedfs/weed/glog" - "k8s.io/mount-utils" ) -// Config holds values to configure the driver -type Config struct { - // Region string - Filer string -} - type Unmounter interface { Unmount() error } @@ -27,107 +15,81 @@ type Mounter interface { Mount(target string) (Unmounter, error) } -type fuseUnmounter struct { - path string - cmd *exec.Cmd +type mountServiceMounter struct { + driver *SeaweedFsDriver + volumeID string + readOnly bool + volContext map[string]string + client *mountmanager.Client +} - finished chan struct{} +type mountServiceUnmounter struct { + client *mountmanager.Client + volumeID string } func newMounter(volumeID string, readOnly bool, driver *SeaweedFsDriver, volContext map[string]string) (Mounter, error) { - path, ok := volContext["path"] - if !ok { - path = fmt.Sprintf("/buckets/%s", volumeID) + client, err := mountmanager.NewClient(driver.mountEndpoint) + if err != nil { + return nil, err } - collection, ok := volContext["collection"] - if !ok { - collection = volumeID + contextCopy := make(map[string]string, len(volContext)) + for k, v := range volContext { + contextCopy[k] = v } - return newSeaweedFsMounter(volumeID, path, collection, readOnly, driver, volContext) + return &mountServiceMounter{ + driver: driver, + volumeID: volumeID, + readOnly: readOnly, + volContext: contextCopy, + client: client, + }, nil } -func fuseMount(path string, command string, args []string) (Unmounter, error) { - cmd := exec.Command(command, args...) - glog.V(0).Infof("Mounting fuse with command: %s and args: %s", command, args) - - // log fuse process messages - we need an easy way to investigate crashes in case it happens - cmd.Stderr = os.Stderr - cmd.Stdout = os.Stdout - - err := cmd.Start() - if err != nil { - glog.Errorf("running weed mount: %v", err) - return nil, fmt.Errorf("error fuseMount command: %s\nargs: %s\nerror: %v", command, args, err) +func (m *mountServiceMounter) Mount(target string) (Unmounter, error) { + if target == "" { + return nil, fmt.Errorf("target path is required") } - fu := &fuseUnmounter{ - path: path, - cmd: cmd, - - finished: make(chan struct{}), + filers := make([]string, len(m.driver.filers)) + for i, address := range m.driver.filers { + filers[i] = string(address) } - // avoid zombie processes - go func() { - if err := cmd.Wait(); err != nil { - glog.Errorf("weed mount exit, pid: %d, path: %v, error: %v", cmd.Process.Pid, path, err) - } else { - glog.Infof("weed mount exit, pid: %d, path: %v", cmd.Process.Pid, path) - } - - // make sure we'll have no stale mounts - time.Sleep(time.Millisecond * 100) - _ = mountutil.Unmount(path) - - close(fu.finished) - }() - - if err = waitForMount(path, 10*time.Second); err != nil { - glog.Errorf("weed mount timeout, pid: %d, path: %v", cmd.Process.Pid, path) + req := &mountmanager.MountRequest{ + VolumeID: m.volumeID, + TargetPath: target, + ReadOnly: m.readOnly, + Filers: filers, + CacheDir: m.driver.CacheDir, + CacheCapacityMB: m.driver.CacheCapacityMB, + ConcurrentWriters: m.driver.ConcurrentWriters, + UidMap: m.driver.UidMap, + GidMap: m.driver.GidMap, + DataCenter: m.driver.DataCenter, + DataLocality: m.driver.DataLocality.String(), + VolumeContext: m.volContext, + } - _ = fu.finish(time.Second * 10) + resp, err := m.client.Mount(req) + if err != nil { return nil, err - } else { - return fu, nil } -} -func (fu *fuseUnmounter) finish(timeout time.Duration) error { - // ignore error, just inform we want process to exit - // SIGHUP is used to reload weed config - we need to use SIGTERM - _ = fu.cmd.Process.Signal(syscall.SIGTERM) - - if err := fu.waitFinished(timeout); err != nil { - glog.Errorf("weed mount terminate timeout, pid: %d, path: %v", fu.cmd.Process.Pid, fu.path) - _ = fu.cmd.Process.Kill() - if err = fu.waitFinished(time.Second * 1); err != nil { - glog.Errorf("weed mount kill timeout, pid: %d, path: %v", fu.cmd.Process.Pid, fu.path) - return err - } + expectedSocket := mountmanager.LocalSocketPath(m.volumeID) + if resp.LocalSocket != "" && resp.LocalSocket != expectedSocket { + glog.Warningf("mount service returned socket %s for volume %s (expected %s)", resp.LocalSocket, m.volumeID, expectedSocket) } - return nil + return &mountServiceUnmounter{ + client: m.client, + volumeID: m.volumeID, + }, nil } -func (fu *fuseUnmounter) waitFinished(timeout time.Duration) error { - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - select { - case <-ctx.Done(): - return context.DeadlineExceeded - case <-fu.finished: - return nil - } -} - -func (fu *fuseUnmounter) Unmount() error { - if ok, err := mountutil.IsMountPoint(fu.path); ok || mount.IsCorruptedMnt(err) { - if err := mountutil.Unmount(fu.path); err != nil { - return err - } - } - - return fu.finish(time.Second * 5) +func (u *mountServiceUnmounter) Unmount() error { + _, err := u.client.Unmount(&mountmanager.UnmountRequest{VolumeID: u.volumeID}) + return err } diff --git a/pkg/driver/mounter_seaweedfs.go b/pkg/driver/mounter_seaweedfs.go deleted file mode 100644 index 56ba98b..0000000 --- a/pkg/driver/mounter_seaweedfs.go +++ /dev/null @@ -1,188 +0,0 @@ -package driver - -import ( - "fmt" - "os" - "path/filepath" - "strings" - - "github.com/seaweedfs/seaweedfs-csi-driver/pkg/datalocality" - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/util" -) - -// Implements Mounter -type seaweedFsMounter struct { - volumeID string - path string - collection string - readOnly bool - driver *SeaweedFsDriver - volContext map[string]string -} - -type seaweedFsUnmounter struct { - unmounter Unmounter - cacheDir string -} - -const ( - seaweedFsCmd = "weed" -) - -func newSeaweedFsMounter(volumeID string, path string, collection string, readOnly bool, driver *SeaweedFsDriver, volContext map[string]string) (Mounter, error) { - return &seaweedFsMounter{ - volumeID: volumeID, - path: path, - collection: collection, - readOnly: readOnly, - driver: driver, - volContext: volContext, - }, nil -} - -func (seaweedFs *seaweedFsMounter) Mount(target string) (Unmounter, error) { - glog.V(0).Infof("mounting %v %s to %s", seaweedFs.driver.filers, seaweedFs.path, target) - - var filers []string - for _, address := range seaweedFs.driver.filers { - filers = append(filers, string(address)) - } - - // CacheDirForRead should be always defined - we use temp dir in case it is not defined - // we need to use predictable cache path, because we need to clean it up on unstage - cacheDir := filepath.Join(seaweedFs.driver.CacheDir, seaweedFs.volumeID) - - // Final args - args := []string{ - "-logtostderr=true", - "mount", - "-dirAutoCreate=true", - "-umask=000", - fmt.Sprintf("-dir=%s", target), - fmt.Sprintf("-localSocket=%s", GetLocalSocket(seaweedFs.volumeID)), - fmt.Sprintf("-cacheDir=%s", cacheDir), - } - - if seaweedFs.readOnly { - args = append(args, "-readOnly") - } - - // Values for override-able args - // Whitelist for merging with volContext - argsMap := map[string]string{ - "collection": seaweedFs.collection, - "filer": strings.Join(filers, ","), - "filer.path": seaweedFs.path, - "cacheCapacityMB": fmt.Sprint(seaweedFs.driver.CacheCapacityMB), - "concurrentWriters": fmt.Sprint(seaweedFs.driver.ConcurrentWriters), - "map.uid": seaweedFs.driver.UidMap, - "map.gid": seaweedFs.driver.GidMap, - "disk": "", - "dataCenter": "", - "replication": "", - "ttl": "", - "chunkSizeLimitMB": "", - "volumeServerAccess": "", - "readRetryTime": "", - } - - // Handle DataLocality - dataLocality := seaweedFs.driver.DataLocality - // Try to override when set in context - if dataLocalityStr, ok := seaweedFs.volContext["dataLocality"]; ok { - // Convert to enum - dataLocalityRes, ok := datalocality.FromString(dataLocalityStr) - if !ok { - glog.Warning("volumeContext 'dataLocality' invalid") - } else { - dataLocality = dataLocalityRes - } - } - if err := CheckDataLocality(&dataLocality, &seaweedFs.driver.DataCenter); err != nil { - return nil, err - } - // Settings based on type - switch dataLocality { - case datalocality.Write_preferLocalDc: - argsMap["dataCenter"] = seaweedFs.driver.DataCenter - } - - // volContext-parameter -> mount-arg - parameterArgMap := map[string]string{ - "uidMap": "map.uid", - "gidMap": "map.gid", - "filerPath": "filer.path", - // volumeContext has "diskType", but mount-option is "disk", converting for backwards compatability - "diskType": "disk", - } - - // Explicitly ignored volContext args e.g. handled somewhere else - ignoreArgs := []string{ - "dataLocality", - } - - // Merge volContext into argsMap with key-mapping - for arg, value := range seaweedFs.volContext { - if in_arr(ignoreArgs, arg) { - continue - } - - // Check if key-mapping exists - newArg, ok := parameterArgMap[arg] - if ok { - arg = newArg - } - - // Check if arg can be applied - if _, ok := argsMap[arg]; !ok { - glog.Warningf("VolumeContext '%s' ignored", arg) - continue - } - - // Write to args-map - argsMap[arg] = value - } - - // Convert Args-Map to args - for arg, value := range argsMap { - if value != "" { // ignore empty values - args = append(args, fmt.Sprintf("-%s=%s", arg, value)) - } - } - - u, err := fuseMount(target, seaweedFsCmd, args) - if err != nil { - glog.Errorf("mount %v %s to %s: %s", seaweedFs.driver.filers, seaweedFs.path, target, err) - } - - return &seaweedFsUnmounter{unmounter: u, cacheDir: cacheDir}, err -} - -func (su *seaweedFsUnmounter) Unmount() error { - err := su.unmounter.Unmount() - err2 := os.RemoveAll(su.cacheDir) - if err2 != nil { - glog.Warningf("error removing cache from: %s, err: %v", su.cacheDir, err2) - } - return err -} - -func GetLocalSocket(volumeID string) string { - montDirHash := util.HashToInt32([]byte(volumeID)) - if montDirHash < 0 { - montDirHash = -montDirHash - } - - socket := fmt.Sprintf("/tmp/seaweedfs-mount-%d.sock", montDirHash) - return socket -} - -func in_arr(arr []string, val string) bool { - for _, v := range arr { - if val == v { - return true - } - } - return false -} diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go index 21e68bf..9371d09 100644 --- a/pkg/driver/nodeserver.go +++ b/pkg/driver/nodeserver.go @@ -185,11 +185,12 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis // Note: The returned Volume won't have an unmounter, so Unstage will need special handling. func (ns *NodeServer) rebuildVolumeFromStaging(volumeID string, stagingPath string) *Volume { return &Volume{ - VolumeId: volumeID, - StagedPath: stagingPath, - localSocket: GetLocalSocket(volumeID), + VolumeId: volumeID, + StagedPath: stagingPath, + driver: ns.Driver, // mounter and unmounter are nil - this is intentional // The FUSE process is already running, we just need to track the volume + // The mount service will have the mount tracked if it's still alive } } @@ -344,17 +345,7 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV } func (ns *NodeServer) NodeCleanup() { - ns.volumes.Range(func(_, vol any) bool { - v := vol.(*Volume) - if len(v.StagedPath) > 0 { - glog.Infof("cleaning up volume %s at %s", v.VolumeId, v.StagedPath) - err := v.Unstage(v.StagedPath) - if err != nil { - glog.Warningf("error cleaning up volume %s at %s, err: %v", v.VolumeId, v.StagedPath, err) - } - } - return true - }) + glog.Infof("node cleanup skipped; mount service retains mounts across restarts") } func (ns *NodeServer) getVolumeMutex(volumeID string) *sync.Mutex { @@ -373,7 +364,7 @@ func (ns *NodeServer) stageNewVolume(volumeID, stagingTargetPath string, volCont return nil, err } - volume := NewVolume(volumeID, mounter) + volume := NewVolume(volumeID, mounter, ns.Driver) if err := volume.Stage(stagingTargetPath); err != nil { return nil, err } diff --git a/pkg/driver/volume.go b/pkg/driver/volume.go index 6cc777b..080595e 100644 --- a/pkg/driver/volume.go +++ b/pkg/driver/volume.go @@ -1,14 +1,10 @@ package driver import ( - "context" - "fmt" "os" + "github.com/seaweedfs/seaweedfs-csi-driver/pkg/mountmanager" "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" "k8s.io/mount-utils" ) @@ -18,16 +14,14 @@ type Volume struct { mounter Mounter unmounter Unmounter - - // unix socket used to manage volume - localSocket string + driver *SeaweedFsDriver } -func NewVolume(volumeID string, mounter Mounter) *Volume { +func NewVolume(volumeID string, mounter Mounter, driver *SeaweedFsDriver) *Volume { return &Volume{ - VolumeId: volumeID, - mounter: mounter, - localSocket: GetLocalSocket(volumeID), + VolumeId: volumeID, + mounter: mounter, + driver: driver, } } @@ -81,22 +75,18 @@ func (vol *Volume) Publish(stagingTargetPath string, targetPath string, readOnly } func (vol *Volume) Quota(sizeByte int64) error { - target := fmt.Sprintf("passthrough:///unix://%s", vol.localSocket) - dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) - - clientConn, err := grpc.Dial(target, dialOption) + client, err := mountmanager.NewClient(vol.driver.mountEndpoint) if err != nil { return err } - defer clientConn.Close() // We can't create PV of zero size, so we're using quota of 1 byte to define no quota. if sizeByte == 1 { sizeByte = 0 } - client := mount_pb.NewSeaweedMountClient(clientConn) - _, err = client.Configure(context.Background(), &mount_pb.ConfigureRequest{ + _, err = client.Configure(&mountmanager.ConfigureRequest{ + VolumeID: vol.VolumeId, CollectionCapacity: sizeByte, }) return err diff --git a/pkg/mountmanager/client.go b/pkg/mountmanager/client.go new file mode 100644 index 0000000..8a53d00 --- /dev/null +++ b/pkg/mountmanager/client.go @@ -0,0 +1,110 @@ +package mountmanager + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "net/http" + "time" +) + +// Client talks to the mount service over a Unix domain socket. +type Client struct { + httpClient *http.Client + baseURL string +} + +// NewClient builds a new Client for the given endpoint. +func NewClient(endpoint string) (*Client, error) { + scheme, address, err := ParseEndpoint(endpoint) + if err != nil { + return nil, err + } + if scheme != "unix" { + return nil, fmt.Errorf("unsupported endpoint scheme: %s", scheme) + } + + dialer := &net.Dialer{} + transport := &http.Transport{ + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + return dialer.DialContext(ctx, "unix", address) + }, + } + + return &Client{ + httpClient: &http.Client{ + Timeout: 30 * time.Second, + Transport: transport, + }, + baseURL: "http://unix", + }, nil +} + +// Mount mounts a volume using the mount service. +func (c *Client) Mount(req *MountRequest) (*MountResponse, error) { + var resp MountResponse + if err := c.doPost("/mount", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +// Unmount unmounts a volume using the mount service. +func (c *Client) Unmount(req *UnmountRequest) (*UnmountResponse, error) { + var resp UnmountResponse + if err := c.doPost("/unmount", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +// Configure updates runtime options such as quota for an existing mount. +func (c *Client) Configure(req *ConfigureRequest) (*ConfigureResponse, error) { + var resp ConfigureResponse + if err := c.doPost("/configure", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *Client) doPost(path string, payload any, out any) error { + body := &bytes.Buffer{} + if err := json.NewEncoder(body).Encode(payload); err != nil { + return fmt.Errorf("encode request: %w", err) + } + + req, err := http.NewRequest(http.MethodPost, c.baseURL+path, body) + if err != nil { + return fmt.Errorf("build request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("call mount service: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + var errResp ErrorResponse + if err := json.NewDecoder(resp.Body).Decode(&errResp); err == nil && errResp.Error != "" { + return errors.New(errResp.Error) + } + data, _ := io.ReadAll(resp.Body) + return fmt.Errorf("mount service error: %s (%s)", resp.Status, string(data)) + } + + if out == nil { + _, _ = io.Copy(io.Discard, resp.Body) + return nil + } + + if err := json.NewDecoder(resp.Body).Decode(out); err != nil { + return fmt.Errorf("decode response: %w", err) + } + return nil +} diff --git a/pkg/mountmanager/endpoint.go b/pkg/mountmanager/endpoint.go new file mode 100644 index 0000000..1424536 --- /dev/null +++ b/pkg/mountmanager/endpoint.go @@ -0,0 +1,15 @@ +package mountmanager + +import ( + "fmt" + "strings" +) + +// ParseEndpoint splits an endpoint string like "unix:///path" into scheme and address. +func ParseEndpoint(endpoint string) (scheme, address string, err error) { + parts := strings.SplitN(endpoint, "://", 2) + if len(parts) != 2 || parts[1] == "" { + return "", "", fmt.Errorf("invalid endpoint: %s", endpoint) + } + return strings.ToLower(parts[0]), parts[1], nil +} diff --git a/pkg/mountmanager/keymutex.go b/pkg/mountmanager/keymutex.go new file mode 100644 index 0000000..0c64dcc --- /dev/null +++ b/pkg/mountmanager/keymutex.go @@ -0,0 +1,21 @@ +package mountmanager + +import "sync" + +// keyMutex provides a lock per key to serialize operations per volume. +type keyMutex struct { + mutexes sync.Map +} + +func newKeyMutex() *keyMutex { + return &keyMutex{} +} + +func (km *keyMutex) get(key string) *sync.Mutex { + m, _ := km.mutexes.LoadOrStore(key, &sync.Mutex{}) + return m.(*sync.Mutex) +} + +func (km *keyMutex) delete(key string) { + km.mutexes.Delete(key) +} diff --git a/pkg/mountmanager/manager.go b/pkg/mountmanager/manager.go new file mode 100644 index 0000000..0553af5 --- /dev/null +++ b/pkg/mountmanager/manager.go @@ -0,0 +1,490 @@ +package mountmanager + +import ( + "context" + "errors" + "fmt" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + "sync" + "syscall" + "time" + + "github.com/seaweedfs/seaweedfs-csi-driver/pkg/datalocality" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "k8s.io/mount-utils" +) + +var kubeMounter = mount.New("") + +// Manager owns weed mount processes and exposes helpers to start and stop them. +type Manager struct { + weedBinary string + + mu sync.Mutex + mounts map[string]*mountEntry + locks *keyMutex +} + +// Config configures a Manager instance. +type Config struct { + WeedBinary string +} + +// NewManager returns a Manager ready to accept mount requests. +func NewManager(cfg Config) *Manager { + binary := cfg.WeedBinary + if binary == "" { + binary = DefaultWeedBinary + } + return &Manager{ + weedBinary: binary, + mounts: make(map[string]*mountEntry), + locks: newKeyMutex(), + } +} + +// Mount starts a weed mount process using the provided request. +func (m *Manager) Mount(req *MountRequest) (*MountResponse, error) { + if req == nil { + return nil, errors.New("mount request is nil") + } + if err := validateMountRequest(req); err != nil { + return nil, err + } + + lock := m.locks.get(req.VolumeID) + lock.Lock() + defer lock.Unlock() + + if entry := m.getMount(req.VolumeID); entry != nil { + if entry.targetPath == req.TargetPath { + glog.V(1).Infof("volume %s already mounted at %s", req.VolumeID, req.TargetPath) + return &MountResponse{LocalSocket: entry.localSocket}, nil + } + return nil, fmt.Errorf("volume %s already mounted at %s", req.VolumeID, entry.targetPath) + } + + entry, err := m.startMount(req) + if err != nil { + return nil, err + } + + m.mu.Lock() + m.mounts[req.VolumeID] = entry + m.mu.Unlock() + + go m.watchMount(req.VolumeID, entry) + + return &MountResponse{LocalSocket: entry.localSocket}, nil +} + +// Unmount terminates the weed mount process associated with the provided request. +func (m *Manager) Unmount(req *UnmountRequest) (*UnmountResponse, error) { + if req == nil { + return nil, errors.New("unmount request is nil") + } + if req.VolumeID == "" { + return nil, errors.New("volumeId is required") + } + + lock := m.locks.get(req.VolumeID) + lock.Lock() + defer lock.Unlock() + + entry := m.removeMount(req.VolumeID) + if entry == nil { + glog.V(1).Infof("volume %s not mounted", req.VolumeID) + return &UnmountResponse{}, nil + } + + defer os.RemoveAll(entry.cacheDir) + + if err := entry.process.stop(); err != nil { + return nil, err + } + + return &UnmountResponse{}, nil +} + +func (m *Manager) getMount(volumeID string) *mountEntry { + m.mu.Lock() + defer m.mu.Unlock() + return m.mounts[volumeID] +} + +func (m *Manager) removeMount(volumeID string) *mountEntry { + m.mu.Lock() + defer m.mu.Unlock() + entry := m.mounts[volumeID] + delete(m.mounts, volumeID) + m.locks.delete(volumeID) + return entry +} + +func (m *Manager) watchMount(volumeID string, entry *mountEntry) { + <-entry.process.done + m.mu.Lock() + current, ok := m.mounts[volumeID] + if ok && current == entry { + delete(m.mounts, volumeID) + } + m.mu.Unlock() + os.RemoveAll(entry.cacheDir) +} + +func (m *Manager) startMount(req *MountRequest) (*mountEntry, error) { + targetPath := req.TargetPath + if err := ensureTargetClean(targetPath); err != nil { + return nil, err + } + + cacheBase := req.CacheDir + if cacheBase == "" { + cacheBase = os.TempDir() + } + cacheDir := filepath.Join(cacheBase, req.VolumeID) + if err := os.MkdirAll(cacheDir, 0750); err != nil { + return nil, fmt.Errorf("creating cache dir: %w", err) + } + + localSocket := LocalSocketPath(req.VolumeID) + args, err := buildMountArgs(req, targetPath, cacheDir, localSocket) + if err != nil { + return nil, err + } + + process, err := startWeedMountProcess(m.weedBinary, args, targetPath) + if err != nil { + return nil, err + } + + return &mountEntry{ + volumeID: req.VolumeID, + targetPath: targetPath, + cacheDir: cacheDir, + localSocket: localSocket, + process: process, + }, nil +} + +func ensureTargetClean(targetPath string) error { + isMount, err := kubeMounter.IsMountPoint(targetPath) + if err != nil { + if os.IsNotExist(err) { + return os.MkdirAll(targetPath, 0750) + } + if mount.IsCorruptedMnt(err) { + if err := kubeMounter.Unmount(targetPath); err != nil { + return err + } + return ensureTargetClean(targetPath) + } + return err + } + if isMount { + if err := kubeMounter.Unmount(targetPath); err != nil { + return err + } + } + return nil +} + +func validateMountRequest(req *MountRequest) error { + if req.VolumeID == "" { + return errors.New("volumeId is required") + } + if req.TargetPath == "" { + return errors.New("targetPath is required") + } + if len(req.Filers) == 0 { + return errors.New("at least one filer is required") + } + return nil +} + +func buildMountArgs(req *MountRequest, targetPath, cacheDir, localSocket string) ([]string, error) { + volumeContext := req.VolumeContext + if volumeContext == nil { + volumeContext = map[string]string{} + } + + path := volumeContext["path"] + if path == "" { + path = fmt.Sprintf("/buckets/%s", req.VolumeID) + } + + collection := volumeContext["collection"] + if collection == "" { + collection = req.VolumeID + } + + args := []string{ + "-logtostderr=true", + "mount", + "-dirAutoCreate=true", + "-umask=000", + fmt.Sprintf("-dir=%s", targetPath), + fmt.Sprintf("-localSocket=%s", localSocket), + fmt.Sprintf("-cacheDir=%s", cacheDir), + } + + if req.ReadOnly { + args = append(args, "-readOnly") + } + + argsMap := map[string]string{ + "collection": collection, + "filer": strings.Join(req.Filers, ","), + "filer.path": path, + "cacheCapacityMB": strconv.Itoa(req.CacheCapacityMB), + "concurrentWriters": strconv.Itoa(req.ConcurrentWriters), + "map.uid": req.UidMap, + "map.gid": req.GidMap, + "disk": "", + "dataCenter": "", + "replication": "", + "ttl": "", + "chunkSizeLimitMB": "", + "volumeServerAccess": "", + "readRetryTime": "", + } + + dataLocality := datalocality.None + if req.DataLocality != "" { + if dl, ok := datalocality.FromString(req.DataLocality); ok { + dataLocality = dl + } else { + return nil, fmt.Errorf("invalid dataLocality: %s", req.DataLocality) + } + } + + if contextLocality, ok := volumeContext["dataLocality"]; ok { + if dl, ok := datalocality.FromString(contextLocality); ok { + dataLocality = dl + } else { + return nil, fmt.Errorf("invalid volumeContext dataLocality: %s", contextLocality) + } + } + + if err := checkDataLocality(&dataLocality, req.DataCenter); err != nil { + return nil, err + } + + switch dataLocality { + case datalocality.Write_preferLocalDc: + argsMap["dataCenter"] = req.DataCenter + } + + parameterArgMap := map[string]string{ + "uidMap": "map.uid", + "gidMap": "map.gid", + "filerPath": "filer.path", + "diskType": "disk", + } + + ignoredArgs := map[string]struct{}{"dataLocality": {}} + + for key, value := range volumeContext { + if _, ignored := ignoredArgs[key]; ignored { + continue + } + if mapped, ok := parameterArgMap[key]; ok { + key = mapped + } + if _, ok := argsMap[key]; !ok { + glog.Warningf("VolumeContext '%s' ignored", key) + continue + } + if value != "" { + argsMap[key] = value + } + } + + for key, value := range argsMap { + if value == "" { + continue + } + args = append(args, fmt.Sprintf("-%s=%s", key, value)) + } + + return args, nil +} + +func checkDataLocality(dataLocality *datalocality.DataLocality, dataCenter string) error { + if *dataLocality != datalocality.None && dataCenter == "" { + return fmt.Errorf("dataLocality set, but dataCenter is empty") + } + return nil +} + +// Configure updates mount-level settings, e.g. quota. Intended to be invoked by a trusted caller. +func (m *Manager) Configure(req *ConfigureRequest) (*ConfigureResponse, error) { + if req == nil { + return nil, errors.New("configure request is nil") + } + + lock := m.locks.get(req.VolumeID) + lock.Lock() + defer lock.Unlock() + + entry := m.getMount(req.VolumeID) + if entry == nil { + m.locks.delete(req.VolumeID) + return nil, fmt.Errorf("volume %s not mounted", req.VolumeID) + } + + client, err := newMountProcessClient(entry.localSocket) + if err != nil { + return nil, err + } + defer client.Close() + + if err := client.Configure(req.CollectionCapacity); err != nil { + return nil, err + } + + return &ConfigureResponse{}, nil +} + +type mountEntry struct { + volumeID string + targetPath string + cacheDir string + localSocket string + process *weedMountProcess +} + +type weedMountProcess struct { + cmd *exec.Cmd + target string + done chan struct{} +} + +func startWeedMountProcess(command string, args []string, target string) (*weedMountProcess, error) { + cmd := exec.Command(command, args...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + glog.V(0).Infof("Starting weed mount: %s %s", command, strings.Join(args, " ")) + + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("starting weed mount: %w", err) + } + + process := &weedMountProcess{ + cmd: cmd, + target: target, + done: make(chan struct{}), + } + + go process.wait() + + if err := waitForMount(target, 10*time.Second); err != nil { + _ = process.stop() + return nil, err + } + + return process, nil +} + +func (p *weedMountProcess) wait() { + if err := p.cmd.Wait(); err != nil { + glog.Errorf("weed mount exit (pid: %d, target: %s): %v", p.cmd.Process.Pid, p.target, err) + } else { + glog.Infof("weed mount exit (pid: %d, target: %s)", p.cmd.Process.Pid, p.target) + } + + time.Sleep(100 * time.Millisecond) + _ = kubeMounter.Unmount(p.target) + + close(p.done) +} + +func (p *weedMountProcess) stop() error { + if p.cmd.Process == nil { + return nil + } + + if err := p.cmd.Process.Signal(syscall.SIGTERM); err != nil && err != os.ErrProcessDone { + glog.Warningf("sending SIGTERM to weed mount failed: %v", err) + } + + select { + case <-p.done: + return nil + case <-time.After(5 * time.Second): + } + + if err := p.cmd.Process.Kill(); err != nil && err != os.ErrProcessDone { + glog.Warningf("killing weed mount failed: %v", err) + } + + select { + case <-p.done: + return nil + case <-time.After(1 * time.Second): + return errors.New("timed out waiting for weed mount to stop") + } +} + +func waitForMount(path string, timeout time.Duration) error { + var elapsed time.Duration + interval := 10 * time.Millisecond + + for { + notMount, err := kubeMounter.IsLikelyNotMountPoint(path) + if err != nil { + return err + } + if !notMount { + return nil + } + + time.Sleep(interval) + elapsed += interval + if elapsed >= timeout { + return errors.New("timeout waiting for mount") + } + } +} + +type mountProcessClient struct { + conn *grpc.ClientConn + client mount_pb.SeaweedMountClient +} + +func newMountProcessClient(localSocket string) (*mountProcessClient, error) { + target := fmt.Sprintf("passthrough:///unix://%s", localSocket) + conn, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, fmt.Errorf("dial mount socket %s: %w", localSocket, err) + } + + return &mountProcessClient{ + conn: conn, + client: mount_pb.NewSeaweedMountClient(conn), + }, nil +} + +func (c *mountProcessClient) Close() error { + return c.conn.Close() +} + +func (c *mountProcessClient) Configure(capacity int64) error { + if capacity == 1 { + capacity = 0 + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + _, err := c.client.Configure(ctx, &mount_pb.ConfigureRequest{CollectionCapacity: capacity}) + return err +} diff --git a/pkg/mountmanager/socket.go b/pkg/mountmanager/socket.go new file mode 100644 index 0000000..bd63146 --- /dev/null +++ b/pkg/mountmanager/socket.go @@ -0,0 +1,16 @@ +package mountmanager + +import ( + "fmt" + + "github.com/seaweedfs/seaweedfs/weed/util" +) + +// LocalSocketPath returns the unix socket path used to communicate with the weed mount process. +func LocalSocketPath(volumeID string) string { + hash := util.HashToInt32([]byte(volumeID)) + if hash < 0 { + hash = -hash + } + return fmt.Sprintf("/tmp/seaweedfs-mount-%d.sock", hash) +} diff --git a/pkg/mountmanager/types.go b/pkg/mountmanager/types.go new file mode 100644 index 0000000..37f791f --- /dev/null +++ b/pkg/mountmanager/types.go @@ -0,0 +1,49 @@ +package mountmanager + +// MountRequest contains all information needed to start a weed mount process. +type MountRequest struct { + VolumeID string `json:"volumeId"` + TargetPath string `json:"targetPath"` + ReadOnly bool `json:"readOnly"` + Filers []string `json:"filers"` + CacheDir string `json:"cacheDir"` + CacheCapacityMB int `json:"cacheCapacityMB"` + ConcurrentWriters int `json:"concurrentWriters"` + UidMap string `json:"uidMap"` + GidMap string `json:"gidMap"` + DataCenter string `json:"dataCenter"` + DataLocality string `json:"dataLocality"` + VolumeContext map[string]string `json:"volumeContext"` +} + +// MountResponse is returned after a successful mount request. +type MountResponse struct { + LocalSocket string `json:"localSocket"` +} + +// UnmountRequest contains the information needed to stop a weed mount process. +type UnmountRequest struct { + VolumeID string `json:"volumeId"` +} + +// UnmountResponse is the response of a successful unmount request. +type UnmountResponse struct{} + +// ConfigureRequest adjusts the behaviour of an existing mount. +type ConfigureRequest struct { + VolumeID string `json:"volumeId"` + CollectionCapacity int64 `json:"collectionCapacity"` +} + +// ConfigureResponse represents a successful configure call. +type ConfigureResponse struct{} + +// ErrorResponse is returned when the mount service encounters a failure. +type ErrorResponse struct { + Error string `json:"error"` +} + +const ( + // DefaultWeedBinary is the default executable name used to spawn weed mount processes. + DefaultWeedBinary = "weed" +) |
