aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/seaweedfs-mount/main.go21
-rw-r--r--pkg/driver/mounter.go144
-rw-r--r--pkg/driver/volume.go27
-rw-r--r--pkg/mountmanager/client.go9
-rw-r--r--pkg/mountmanager/manager.go210
-rw-r--r--pkg/mountmanager/socket.go2
-rw-r--r--pkg/mountmanager/types.go26
7 files changed, 168 insertions, 271 deletions
diff --git a/cmd/seaweedfs-mount/main.go b/cmd/seaweedfs-mount/main.go
index 174b530..12151e8 100644
--- a/cmd/seaweedfs-mount/main.go
+++ b/cmd/seaweedfs-mount/main.go
@@ -90,27 +90,6 @@ func main() {
writeJSON(w, http.StatusOK, resp)
})
- mux.HandleFunc("/configure", func(w http.ResponseWriter, r *http.Request) {
- if r.Method != http.MethodPost {
- writeError(w, http.StatusMethodNotAllowed, "method not allowed")
- return
- }
-
- var req mountmanager.ConfigureRequest
- if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
- writeError(w, http.StatusBadRequest, "invalid request: "+err.Error())
- return
- }
-
- resp, err := manager.Configure(&req)
- if err != nil {
- writeError(w, http.StatusBadRequest, err.Error())
- return
- }
-
- writeJSON(w, http.StatusOK, resp)
- })
-
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok"))
diff --git a/pkg/driver/mounter.go b/pkg/driver/mounter.go
index 6beee55..d86d82e 100644
--- a/pkg/driver/mounter.go
+++ b/pkg/driver/mounter.go
@@ -2,7 +2,12 @@ package driver
import (
"fmt"
+ "os"
+ "path/filepath"
+ "strconv"
+ "strings"
+ "github.com/seaweedfs/seaweedfs-csi-driver/pkg/datalocality"
"github.com/seaweedfs/seaweedfs-csi-driver/pkg/mountmanager"
"github.com/seaweedfs/seaweedfs/weed/glog"
)
@@ -58,29 +63,29 @@ func (m *mountServiceMounter) Mount(target string) (Unmounter, error) {
filers[i] = string(address)
}
- req := &mountmanager.MountRequest{
- VolumeID: m.volumeID,
- TargetPath: target,
- ReadOnly: m.readOnly,
- Filers: filers,
- CacheDir: m.driver.CacheDir,
- CacheCapacityMB: m.driver.CacheCapacityMB,
- ConcurrentWriters: m.driver.ConcurrentWriters,
- UidMap: m.driver.UidMap,
- GidMap: m.driver.GidMap,
- DataCenter: m.driver.DataCenter,
- DataLocality: m.driver.DataLocality.String(),
- VolumeContext: m.volContext,
- }
-
- resp, err := m.client.Mount(req)
+ cacheBase := m.driver.CacheDir
+ if cacheBase == "" {
+ cacheBase = os.TempDir()
+ }
+ cacheDir := filepath.Join(cacheBase, m.volumeID)
+ localSocket := mountmanager.LocalSocketPath(m.volumeID)
+
+ args, err := m.buildMountArgs(target, cacheDir, localSocket, filers)
if err != nil {
return nil, err
}
- expectedSocket := mountmanager.LocalSocketPath(m.volumeID)
- if resp.LocalSocket != "" && resp.LocalSocket != expectedSocket {
- glog.Warningf("mount service returned socket %s for volume %s (expected %s)", resp.LocalSocket, m.volumeID, expectedSocket)
+ req := &mountmanager.MountRequest{
+ VolumeID: m.volumeID,
+ TargetPath: target,
+ CacheDir: cacheDir,
+ MountArgs: args,
+ LocalSocket: localSocket,
+ }
+
+ _, err = m.client.Mount(req)
+ if err != nil {
+ return nil, err
}
return &mountServiceUnmounter{
@@ -93,3 +98,104 @@ func (u *mountServiceUnmounter) Unmount() error {
_, err := u.client.Unmount(&mountmanager.UnmountRequest{VolumeID: u.volumeID})
return err
}
+
+func (m *mountServiceMounter) buildMountArgs(targetPath, cacheDir, localSocket string, filers []string) ([]string, error) {
+ volumeContext := m.volContext
+ if volumeContext == nil {
+ volumeContext = map[string]string{}
+ }
+
+ path := volumeContext["path"]
+ if path == "" {
+ path = fmt.Sprintf("/buckets/%s", m.volumeID)
+ }
+
+ collection := volumeContext["collection"]
+ if collection == "" {
+ collection = m.volumeID
+ }
+
+ args := []string{
+ "-logtostderr=true",
+ "mount",
+ "-dirAutoCreate=true",
+ "-umask=000",
+ fmt.Sprintf("-dir=%s", targetPath),
+ fmt.Sprintf("-localSocket=%s", localSocket),
+ fmt.Sprintf("-cacheDir=%s", cacheDir),
+ }
+
+ if m.readOnly {
+ args = append(args, "-readOnly")
+ }
+
+ argsMap := map[string]string{
+ "collection": collection,
+ "filer": strings.Join(filers, ","),
+ "filer.path": path,
+ "cacheCapacityMB": strconv.Itoa(m.driver.CacheCapacityMB),
+ "concurrentWriters": strconv.Itoa(m.driver.ConcurrentWriters),
+ "map.uid": m.driver.UidMap,
+ "map.gid": m.driver.GidMap,
+ "disk": "",
+ "dataCenter": "",
+ "replication": "",
+ "ttl": "",
+ "chunkSizeLimitMB": "",
+ "volumeServerAccess": "",
+ "readRetryTime": "",
+ }
+
+ dataLocality := m.driver.DataLocality
+ if contextLocality, ok := volumeContext["dataLocality"]; ok && contextLocality != "" {
+ if dl, ok := datalocality.FromString(contextLocality); ok {
+ dataLocality = dl
+ } else {
+ return nil, fmt.Errorf("invalid volumeContext dataLocality: %s", contextLocality)
+ }
+ }
+
+ dataCenter := m.driver.DataCenter
+ if err := CheckDataLocality(&dataLocality, &dataCenter); err != nil {
+ return nil, err
+ }
+
+ switch dataLocality {
+ case datalocality.Write_preferLocalDc:
+ argsMap["dataCenter"] = dataCenter
+ }
+
+ parameterArgMap := map[string]string{
+ "uidMap": "map.uid",
+ "gidMap": "map.gid",
+ "filerPath": "filer.path",
+ "diskType": "disk",
+ }
+
+ ignoredArgs := map[string]struct{}{"dataLocality": {}}
+
+ for key, value := range volumeContext {
+ if _, ignored := ignoredArgs[key]; ignored {
+ continue
+ }
+ if mapped, ok := parameterArgMap[key]; ok {
+ key = mapped
+ }
+ if _, ok := argsMap[key]; !ok {
+ glog.Warningf("VolumeContext '%s' ignored", key)
+ continue
+ }
+ if value != "" {
+ argsMap[key] = value
+ }
+ }
+
+ for key, value := range argsMap {
+ if value == "" {
+ continue
+ }
+ args = append(args, fmt.Sprintf("-%s=%s", key, value))
+ }
+
+ return args, nil
+}
diff --git a/pkg/driver/volume.go b/pkg/driver/volume.go
index 080595e..9e258a8 100644
--- a/pkg/driver/volume.go
+++ b/pkg/driver/volume.go
@@ -1,10 +1,15 @@
package driver
import (
+ "context"
+ "fmt"
"os"
"github.com/seaweedfs/seaweedfs-csi-driver/pkg/mountmanager"
"github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
"k8s.io/mount-utils"
)
@@ -14,14 +19,18 @@ type Volume struct {
mounter Mounter
unmounter Unmounter
- driver *SeaweedFsDriver
+
+ // unix socket used to manage volume
+ localSocket string
+ driver *SeaweedFsDriver
}
func NewVolume(volumeID string, mounter Mounter, driver *SeaweedFsDriver) *Volume {
return &Volume{
- VolumeId: volumeID,
- mounter: mounter,
- driver: driver,
+ VolumeId: volumeID,
+ mounter: mounter,
+ localSocket: mountmanager.LocalSocketPath(volumeID),
+ driver: driver,
}
}
@@ -75,18 +84,22 @@ func (vol *Volume) Publish(stagingTargetPath string, targetPath string, readOnly
}
func (vol *Volume) Quota(sizeByte int64) error {
- client, err := mountmanager.NewClient(vol.driver.mountEndpoint)
+ target := fmt.Sprintf("passthrough:///unix://%s", vol.localSocket)
+ dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
+
+ clientConn, err := grpc.Dial(target, dialOption)
if err != nil {
return err
}
+ defer clientConn.Close()
// We can't create PV of zero size, so we're using quota of 1 byte to define no quota.
if sizeByte == 1 {
sizeByte = 0
}
- _, err = client.Configure(&mountmanager.ConfigureRequest{
- VolumeID: vol.VolumeId,
+ client := mount_pb.NewSeaweedMountClient(clientConn)
+ _, err = client.Configure(context.Background(), &mount_pb.ConfigureRequest{
CollectionCapacity: sizeByte,
})
return err
diff --git a/pkg/mountmanager/client.go b/pkg/mountmanager/client.go
index 8a53d00..6288c44 100644
--- a/pkg/mountmanager/client.go
+++ b/pkg/mountmanager/client.go
@@ -62,15 +62,6 @@ func (c *Client) Unmount(req *UnmountRequest) (*UnmountResponse, error) {
return &resp, nil
}
-// Configure updates runtime options such as quota for an existing mount.
-func (c *Client) Configure(req *ConfigureRequest) (*ConfigureResponse, error) {
- var resp ConfigureResponse
- if err := c.doPost("/configure", req, &resp); err != nil {
- return nil, err
- }
- return &resp, nil
-}
-
func (c *Client) doPost(path string, payload any, out any) error {
body := &bytes.Buffer{}
if err := json.NewEncoder(body).Encode(payload); err != nil {
diff --git a/pkg/mountmanager/manager.go b/pkg/mountmanager/manager.go
index 0553af5..26dda18 100644
--- a/pkg/mountmanager/manager.go
+++ b/pkg/mountmanager/manager.go
@@ -1,23 +1,16 @@
package mountmanager
import (
- "context"
"errors"
"fmt"
"os"
"os/exec"
- "path/filepath"
- "strconv"
"strings"
"sync"
"syscall"
"time"
- "github.com/seaweedfs/seaweedfs-csi-driver/pkg/datalocality"
"github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials/insecure"
"k8s.io/mount-utils"
)
@@ -145,19 +138,22 @@ func (m *Manager) startMount(req *MountRequest) (*mountEntry, error) {
return nil, err
}
- cacheBase := req.CacheDir
- if cacheBase == "" {
- cacheBase = os.TempDir()
+ cacheDir := req.CacheDir
+ if cacheDir == "" {
+ return nil, errors.New("cacheDir is required")
}
- cacheDir := filepath.Join(cacheBase, req.VolumeID)
if err := os.MkdirAll(cacheDir, 0750); err != nil {
return nil, fmt.Errorf("creating cache dir: %w", err)
}
- localSocket := LocalSocketPath(req.VolumeID)
- args, err := buildMountArgs(req, targetPath, cacheDir, localSocket)
- if err != nil {
- return nil, err
+ localSocket := req.LocalSocket
+ if localSocket == "" {
+ return nil, errors.New("localSocket is required")
+ }
+
+ args := req.MountArgs
+ if len(args) == 0 {
+ return nil, errors.New("mountArgs is required")
}
process, err := startWeedMountProcess(m.weedBinary, args, targetPath)
@@ -203,156 +199,18 @@ func validateMountRequest(req *MountRequest) error {
if req.TargetPath == "" {
return errors.New("targetPath is required")
}
- if len(req.Filers) == 0 {
- return errors.New("at least one filer is required")
- }
- return nil
-}
-
-func buildMountArgs(req *MountRequest, targetPath, cacheDir, localSocket string) ([]string, error) {
- volumeContext := req.VolumeContext
- if volumeContext == nil {
- volumeContext = map[string]string{}
- }
-
- path := volumeContext["path"]
- if path == "" {
- path = fmt.Sprintf("/buckets/%s", req.VolumeID)
- }
-
- collection := volumeContext["collection"]
- if collection == "" {
- collection = req.VolumeID
- }
-
- args := []string{
- "-logtostderr=true",
- "mount",
- "-dirAutoCreate=true",
- "-umask=000",
- fmt.Sprintf("-dir=%s", targetPath),
- fmt.Sprintf("-localSocket=%s", localSocket),
- fmt.Sprintf("-cacheDir=%s", cacheDir),
- }
-
- if req.ReadOnly {
- args = append(args, "-readOnly")
- }
-
- argsMap := map[string]string{
- "collection": collection,
- "filer": strings.Join(req.Filers, ","),
- "filer.path": path,
- "cacheCapacityMB": strconv.Itoa(req.CacheCapacityMB),
- "concurrentWriters": strconv.Itoa(req.ConcurrentWriters),
- "map.uid": req.UidMap,
- "map.gid": req.GidMap,
- "disk": "",
- "dataCenter": "",
- "replication": "",
- "ttl": "",
- "chunkSizeLimitMB": "",
- "volumeServerAccess": "",
- "readRetryTime": "",
+ if req.CacheDir == "" {
+ return errors.New("cacheDir is required")
}
-
- dataLocality := datalocality.None
- if req.DataLocality != "" {
- if dl, ok := datalocality.FromString(req.DataLocality); ok {
- dataLocality = dl
- } else {
- return nil, fmt.Errorf("invalid dataLocality: %s", req.DataLocality)
- }
+ if req.LocalSocket == "" {
+ return errors.New("localSocket is required")
}
-
- if contextLocality, ok := volumeContext["dataLocality"]; ok {
- if dl, ok := datalocality.FromString(contextLocality); ok {
- dataLocality = dl
- } else {
- return nil, fmt.Errorf("invalid volumeContext dataLocality: %s", contextLocality)
- }
- }
-
- if err := checkDataLocality(&dataLocality, req.DataCenter); err != nil {
- return nil, err
- }
-
- switch dataLocality {
- case datalocality.Write_preferLocalDc:
- argsMap["dataCenter"] = req.DataCenter
- }
-
- parameterArgMap := map[string]string{
- "uidMap": "map.uid",
- "gidMap": "map.gid",
- "filerPath": "filer.path",
- "diskType": "disk",
- }
-
- ignoredArgs := map[string]struct{}{"dataLocality": {}}
-
- for key, value := range volumeContext {
- if _, ignored := ignoredArgs[key]; ignored {
- continue
- }
- if mapped, ok := parameterArgMap[key]; ok {
- key = mapped
- }
- if _, ok := argsMap[key]; !ok {
- glog.Warningf("VolumeContext '%s' ignored", key)
- continue
- }
- if value != "" {
- argsMap[key] = value
- }
- }
-
- for key, value := range argsMap {
- if value == "" {
- continue
- }
- args = append(args, fmt.Sprintf("-%s=%s", key, value))
- }
-
- return args, nil
-}
-
-func checkDataLocality(dataLocality *datalocality.DataLocality, dataCenter string) error {
- if *dataLocality != datalocality.None && dataCenter == "" {
- return fmt.Errorf("dataLocality set, but dataCenter is empty")
+ if len(req.MountArgs) == 0 {
+ return errors.New("mountArgs is required")
}
return nil
}
-// Configure updates mount-level settings, e.g. quota. Intended to be invoked by a trusted caller.
-func (m *Manager) Configure(req *ConfigureRequest) (*ConfigureResponse, error) {
- if req == nil {
- return nil, errors.New("configure request is nil")
- }
-
- lock := m.locks.get(req.VolumeID)
- lock.Lock()
- defer lock.Unlock()
-
- entry := m.getMount(req.VolumeID)
- if entry == nil {
- m.locks.delete(req.VolumeID)
- return nil, fmt.Errorf("volume %s not mounted", req.VolumeID)
- }
-
- client, err := newMountProcessClient(entry.localSocket)
- if err != nil {
- return nil, err
- }
- defer client.Close()
-
- if err := client.Configure(req.CollectionCapacity); err != nil {
- return nil, err
- }
-
- return &ConfigureResponse{}, nil
-}
-
type mountEntry struct {
volumeID string
targetPath string
@@ -454,37 +312,3 @@ func waitForMount(path string, timeout time.Duration) error {
}
}
}
-
-type mountProcessClient struct {
- conn *grpc.ClientConn
- client mount_pb.SeaweedMountClient
-}
-
-func newMountProcessClient(localSocket string) (*mountProcessClient, error) {
- target := fmt.Sprintf("passthrough:///unix://%s", localSocket)
- conn, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
- if err != nil {
- return nil, fmt.Errorf("dial mount socket %s: %w", localSocket, err)
- }
-
- return &mountProcessClient{
- conn: conn,
- client: mount_pb.NewSeaweedMountClient(conn),
- }, nil
-}
-
-func (c *mountProcessClient) Close() error {
- return c.conn.Close()
-}
-
-func (c *mountProcessClient) Configure(capacity int64) error {
- if capacity == 1 {
- capacity = 0
- }
-
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
-
- _, err := c.client.Configure(ctx, &mount_pb.ConfigureRequest{CollectionCapacity: capacity})
- return err
-}
diff --git a/pkg/mountmanager/socket.go b/pkg/mountmanager/socket.go
index bd63146..1b8a079 100644
--- a/pkg/mountmanager/socket.go
+++ b/pkg/mountmanager/socket.go
@@ -12,5 +12,5 @@ func LocalSocketPath(volumeID string) string {
if hash < 0 {
hash = -hash
}
- return fmt.Sprintf("/tmp/seaweedfs-mount-%d.sock", hash)
+ return fmt.Sprintf("/var/lib/seaweedfs-mount/seaweedfs-mount-%d.sock", hash)
}
diff --git a/pkg/mountmanager/types.go b/pkg/mountmanager/types.go
index 37f791f..b15fe36 100644
--- a/pkg/mountmanager/types.go
+++ b/pkg/mountmanager/types.go
@@ -2,18 +2,11 @@ package mountmanager
// MountRequest contains all information needed to start a weed mount process.
type MountRequest struct {
- VolumeID string `json:"volumeId"`
- TargetPath string `json:"targetPath"`
- ReadOnly bool `json:"readOnly"`
- Filers []string `json:"filers"`
- CacheDir string `json:"cacheDir"`
- CacheCapacityMB int `json:"cacheCapacityMB"`
- ConcurrentWriters int `json:"concurrentWriters"`
- UidMap string `json:"uidMap"`
- GidMap string `json:"gidMap"`
- DataCenter string `json:"dataCenter"`
- DataLocality string `json:"dataLocality"`
- VolumeContext map[string]string `json:"volumeContext"`
+ VolumeID string `json:"volumeId"`
+ TargetPath string `json:"targetPath"`
+ CacheDir string `json:"cacheDir"`
+ MountArgs []string `json:"mountArgs"`
+ LocalSocket string `json:"localSocket"`
}
// MountResponse is returned after a successful mount request.
@@ -29,15 +22,6 @@ type UnmountRequest struct {
// UnmountResponse is the response of a successful unmount request.
type UnmountResponse struct{}
-// ConfigureRequest adjusts the behaviour of an existing mount.
-type ConfigureRequest struct {
- VolumeID string `json:"volumeId"`
- CollectionCapacity int64 `json:"collectionCapacity"`
-}
-
-// ConfigureResponse represents a successful configure call.
-type ConfigureResponse struct{}
-
// ErrorResponse is returned when the mount service encounters a failure.
type ErrorResponse struct {
Error string `json:"error"`