aboutsummaryrefslogtreecommitdiff
path: root/pkg/mountmanager
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/mountmanager')
-rw-r--r--pkg/mountmanager/client.go9
-rw-r--r--pkg/mountmanager/manager.go210
-rw-r--r--pkg/mountmanager/socket.go2
-rw-r--r--pkg/mountmanager/types.go26
4 files changed, 23 insertions, 224 deletions
diff --git a/pkg/mountmanager/client.go b/pkg/mountmanager/client.go
index 8a53d00..6288c44 100644
--- a/pkg/mountmanager/client.go
+++ b/pkg/mountmanager/client.go
@@ -62,15 +62,6 @@ func (c *Client) Unmount(req *UnmountRequest) (*UnmountResponse, error) {
return &resp, nil
}
-// Configure updates runtime options such as quota for an existing mount.
-func (c *Client) Configure(req *ConfigureRequest) (*ConfigureResponse, error) {
- var resp ConfigureResponse
- if err := c.doPost("/configure", req, &resp); err != nil {
- return nil, err
- }
- return &resp, nil
-}
-
func (c *Client) doPost(path string, payload any, out any) error {
body := &bytes.Buffer{}
if err := json.NewEncoder(body).Encode(payload); err != nil {
diff --git a/pkg/mountmanager/manager.go b/pkg/mountmanager/manager.go
index 0553af5..26dda18 100644
--- a/pkg/mountmanager/manager.go
+++ b/pkg/mountmanager/manager.go
@@ -1,23 +1,16 @@
package mountmanager
import (
- "context"
"errors"
"fmt"
"os"
"os/exec"
- "path/filepath"
- "strconv"
"strings"
"sync"
"syscall"
"time"
- "github.com/seaweedfs/seaweedfs-csi-driver/pkg/datalocality"
"github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials/insecure"
"k8s.io/mount-utils"
)
@@ -145,19 +138,22 @@ func (m *Manager) startMount(req *MountRequest) (*mountEntry, error) {
return nil, err
}
- cacheBase := req.CacheDir
- if cacheBase == "" {
- cacheBase = os.TempDir()
+ cacheDir := req.CacheDir
+ if cacheDir == "" {
+ return nil, errors.New("cacheDir is required")
}
- cacheDir := filepath.Join(cacheBase, req.VolumeID)
if err := os.MkdirAll(cacheDir, 0750); err != nil {
return nil, fmt.Errorf("creating cache dir: %w", err)
}
- localSocket := LocalSocketPath(req.VolumeID)
- args, err := buildMountArgs(req, targetPath, cacheDir, localSocket)
- if err != nil {
- return nil, err
+ localSocket := req.LocalSocket
+ if localSocket == "" {
+ return nil, errors.New("localSocket is required")
+ }
+
+ args := req.MountArgs
+ if len(args) == 0 {
+ return nil, errors.New("mountArgs is required")
}
process, err := startWeedMountProcess(m.weedBinary, args, targetPath)
@@ -203,156 +199,18 @@ func validateMountRequest(req *MountRequest) error {
if req.TargetPath == "" {
return errors.New("targetPath is required")
}
- if len(req.Filers) == 0 {
- return errors.New("at least one filer is required")
- }
- return nil
-}
-
-func buildMountArgs(req *MountRequest, targetPath, cacheDir, localSocket string) ([]string, error) {
- volumeContext := req.VolumeContext
- if volumeContext == nil {
- volumeContext = map[string]string{}
- }
-
- path := volumeContext["path"]
- if path == "" {
- path = fmt.Sprintf("/buckets/%s", req.VolumeID)
- }
-
- collection := volumeContext["collection"]
- if collection == "" {
- collection = req.VolumeID
- }
-
- args := []string{
- "-logtostderr=true",
- "mount",
- "-dirAutoCreate=true",
- "-umask=000",
- fmt.Sprintf("-dir=%s", targetPath),
- fmt.Sprintf("-localSocket=%s", localSocket),
- fmt.Sprintf("-cacheDir=%s", cacheDir),
- }
-
- if req.ReadOnly {
- args = append(args, "-readOnly")
- }
-
- argsMap := map[string]string{
- "collection": collection,
- "filer": strings.Join(req.Filers, ","),
- "filer.path": path,
- "cacheCapacityMB": strconv.Itoa(req.CacheCapacityMB),
- "concurrentWriters": strconv.Itoa(req.ConcurrentWriters),
- "map.uid": req.UidMap,
- "map.gid": req.GidMap,
- "disk": "",
- "dataCenter": "",
- "replication": "",
- "ttl": "",
- "chunkSizeLimitMB": "",
- "volumeServerAccess": "",
- "readRetryTime": "",
+ if req.CacheDir == "" {
+ return errors.New("cacheDir is required")
}
-
- dataLocality := datalocality.None
- if req.DataLocality != "" {
- if dl, ok := datalocality.FromString(req.DataLocality); ok {
- dataLocality = dl
- } else {
- return nil, fmt.Errorf("invalid dataLocality: %s", req.DataLocality)
- }
+ if req.LocalSocket == "" {
+ return errors.New("localSocket is required")
}
-
- if contextLocality, ok := volumeContext["dataLocality"]; ok {
- if dl, ok := datalocality.FromString(contextLocality); ok {
- dataLocality = dl
- } else {
- return nil, fmt.Errorf("invalid volumeContext dataLocality: %s", contextLocality)
- }
- }
-
- if err := checkDataLocality(&dataLocality, req.DataCenter); err != nil {
- return nil, err
- }
-
- switch dataLocality {
- case datalocality.Write_preferLocalDc:
- argsMap["dataCenter"] = req.DataCenter
- }
-
- parameterArgMap := map[string]string{
- "uidMap": "map.uid",
- "gidMap": "map.gid",
- "filerPath": "filer.path",
- "diskType": "disk",
- }
-
- ignoredArgs := map[string]struct{}{"dataLocality": {}}
-
- for key, value := range volumeContext {
- if _, ignored := ignoredArgs[key]; ignored {
- continue
- }
- if mapped, ok := parameterArgMap[key]; ok {
- key = mapped
- }
- if _, ok := argsMap[key]; !ok {
- glog.Warningf("VolumeContext '%s' ignored", key)
- continue
- }
- if value != "" {
- argsMap[key] = value
- }
- }
-
- for key, value := range argsMap {
- if value == "" {
- continue
- }
- args = append(args, fmt.Sprintf("-%s=%s", key, value))
- }
-
- return args, nil
-}
-
-func checkDataLocality(dataLocality *datalocality.DataLocality, dataCenter string) error {
- if *dataLocality != datalocality.None && dataCenter == "" {
- return fmt.Errorf("dataLocality set, but dataCenter is empty")
+ if len(req.MountArgs) == 0 {
+ return errors.New("mountArgs is required")
}
return nil
}
-// Configure updates mount-level settings, e.g. quota. Intended to be invoked by a trusted caller.
-func (m *Manager) Configure(req *ConfigureRequest) (*ConfigureResponse, error) {
- if req == nil {
- return nil, errors.New("configure request is nil")
- }
-
- lock := m.locks.get(req.VolumeID)
- lock.Lock()
- defer lock.Unlock()
-
- entry := m.getMount(req.VolumeID)
- if entry == nil {
- m.locks.delete(req.VolumeID)
- return nil, fmt.Errorf("volume %s not mounted", req.VolumeID)
- }
-
- client, err := newMountProcessClient(entry.localSocket)
- if err != nil {
- return nil, err
- }
- defer client.Close()
-
- if err := client.Configure(req.CollectionCapacity); err != nil {
- return nil, err
- }
-
- return &ConfigureResponse{}, nil
-}
-
type mountEntry struct {
volumeID string
targetPath string
@@ -454,37 +312,3 @@ func waitForMount(path string, timeout time.Duration) error {
}
}
}
-
-type mountProcessClient struct {
- conn *grpc.ClientConn
- client mount_pb.SeaweedMountClient
-}
-
-func newMountProcessClient(localSocket string) (*mountProcessClient, error) {
- target := fmt.Sprintf("passthrough:///unix://%s", localSocket)
- conn, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
- if err != nil {
- return nil, fmt.Errorf("dial mount socket %s: %w", localSocket, err)
- }
-
- return &mountProcessClient{
- conn: conn,
- client: mount_pb.NewSeaweedMountClient(conn),
- }, nil
-}
-
-func (c *mountProcessClient) Close() error {
- return c.conn.Close()
-}
-
-func (c *mountProcessClient) Configure(capacity int64) error {
- if capacity == 1 {
- capacity = 0
- }
-
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
-
- _, err := c.client.Configure(ctx, &mount_pb.ConfigureRequest{CollectionCapacity: capacity})
- return err
-}
diff --git a/pkg/mountmanager/socket.go b/pkg/mountmanager/socket.go
index bd63146..1b8a079 100644
--- a/pkg/mountmanager/socket.go
+++ b/pkg/mountmanager/socket.go
@@ -12,5 +12,5 @@ func LocalSocketPath(volumeID string) string {
if hash < 0 {
hash = -hash
}
- return fmt.Sprintf("/tmp/seaweedfs-mount-%d.sock", hash)
+ return fmt.Sprintf("/var/lib/seaweedfs-mount/seaweedfs-mount-%d.sock", hash)
}
diff --git a/pkg/mountmanager/types.go b/pkg/mountmanager/types.go
index 37f791f..b15fe36 100644
--- a/pkg/mountmanager/types.go
+++ b/pkg/mountmanager/types.go
@@ -2,18 +2,11 @@ package mountmanager
// MountRequest contains all information needed to start a weed mount process.
type MountRequest struct {
- VolumeID string `json:"volumeId"`
- TargetPath string `json:"targetPath"`
- ReadOnly bool `json:"readOnly"`
- Filers []string `json:"filers"`
- CacheDir string `json:"cacheDir"`
- CacheCapacityMB int `json:"cacheCapacityMB"`
- ConcurrentWriters int `json:"concurrentWriters"`
- UidMap string `json:"uidMap"`
- GidMap string `json:"gidMap"`
- DataCenter string `json:"dataCenter"`
- DataLocality string `json:"dataLocality"`
- VolumeContext map[string]string `json:"volumeContext"`
+ VolumeID string `json:"volumeId"`
+ TargetPath string `json:"targetPath"`
+ CacheDir string `json:"cacheDir"`
+ MountArgs []string `json:"mountArgs"`
+ LocalSocket string `json:"localSocket"`
}
// MountResponse is returned after a successful mount request.
@@ -29,15 +22,6 @@ type UnmountRequest struct {
// UnmountResponse is the response of a successful unmount request.
type UnmountResponse struct{}
-// ConfigureRequest adjusts the behaviour of an existing mount.
-type ConfigureRequest struct {
- VolumeID string `json:"volumeId"`
- CollectionCapacity int64 `json:"collectionCapacity"`
-}
-
-// ConfigureResponse represents a successful configure call.
-type ConfigureResponse struct{}
-
// ErrorResponse is returned when the mount service encounters a failure.
type ErrorResponse struct {
Error string `json:"error"`