From 504e64b8ab3fcdc58092473479d459b52316e5ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B3=BD=E6=B7=BC=20=E5=91=A8?= Date: Sat, 27 Sep 2025 18:28:34 +0800 Subject: Optimization: Reduce unnecessary logic of seaweedfs-mount --- cmd/seaweedfs-mount/main.go | 21 ----- pkg/driver/mounter.go | 144 ++++++++++++++++++++++++++---- pkg/driver/volume.go | 27 ++++-- pkg/mountmanager/client.go | 9 -- pkg/mountmanager/manager.go | 210 ++++---------------------------------------- pkg/mountmanager/socket.go | 2 +- pkg/mountmanager/types.go | 26 ++---- 7 files changed, 168 insertions(+), 271 deletions(-) diff --git a/cmd/seaweedfs-mount/main.go b/cmd/seaweedfs-mount/main.go index 174b530..12151e8 100644 --- a/cmd/seaweedfs-mount/main.go +++ b/cmd/seaweedfs-mount/main.go @@ -90,27 +90,6 @@ func main() { writeJSON(w, http.StatusOK, resp) }) - mux.HandleFunc("/configure", func(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - - var req mountmanager.ConfigureRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - writeError(w, http.StatusBadRequest, "invalid request: "+err.Error()) - return - } - - resp, err := manager.Configure(&req) - if err != nil { - writeError(w, http.StatusBadRequest, err.Error()) - return - } - - writeJSON(w, http.StatusOK, resp) - }) - mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("ok")) diff --git a/pkg/driver/mounter.go b/pkg/driver/mounter.go index 6beee55..d86d82e 100644 --- a/pkg/driver/mounter.go +++ b/pkg/driver/mounter.go @@ -2,7 +2,12 @@ package driver import ( "fmt" + "os" + "path/filepath" + "strconv" + "strings" + "github.com/seaweedfs/seaweedfs-csi-driver/pkg/datalocality" "github.com/seaweedfs/seaweedfs-csi-driver/pkg/mountmanager" "github.com/seaweedfs/seaweedfs/weed/glog" ) @@ -58,29 +63,29 @@ func (m *mountServiceMounter) Mount(target string) (Unmounter, error) { filers[i] = string(address) } - req := &mountmanager.MountRequest{ - VolumeID: m.volumeID, - TargetPath: target, - ReadOnly: m.readOnly, - Filers: filers, - CacheDir: m.driver.CacheDir, - CacheCapacityMB: m.driver.CacheCapacityMB, - ConcurrentWriters: m.driver.ConcurrentWriters, - UidMap: m.driver.UidMap, - GidMap: m.driver.GidMap, - DataCenter: m.driver.DataCenter, - DataLocality: m.driver.DataLocality.String(), - VolumeContext: m.volContext, - } - - resp, err := m.client.Mount(req) + cacheBase := m.driver.CacheDir + if cacheBase == "" { + cacheBase = os.TempDir() + } + cacheDir := filepath.Join(cacheBase, m.volumeID) + localSocket := mountmanager.LocalSocketPath(m.volumeID) + + args, err := m.buildMountArgs(target, cacheDir, localSocket, filers) if err != nil { return nil, err } - expectedSocket := mountmanager.LocalSocketPath(m.volumeID) - if resp.LocalSocket != "" && resp.LocalSocket != expectedSocket { - glog.Warningf("mount service returned socket %s for volume %s (expected %s)", resp.LocalSocket, m.volumeID, expectedSocket) + req := &mountmanager.MountRequest{ + VolumeID: m.volumeID, + TargetPath: target, + CacheDir: cacheDir, + MountArgs: args, + LocalSocket: localSocket, + } + + _, err = m.client.Mount(req) + if err != nil { + return nil, err } return &mountServiceUnmounter{ @@ -93,3 +98,104 @@ func (u *mountServiceUnmounter) Unmount() error { _, err := u.client.Unmount(&mountmanager.UnmountRequest{VolumeID: u.volumeID}) return err } + +func (m *mountServiceMounter) buildMountArgs(targetPath, cacheDir, localSocket string, filers []string) ([]string, error) { + volumeContext := m.volContext + if volumeContext == nil { + volumeContext = map[string]string{} + } + + path := volumeContext["path"] + if path == "" { + path = fmt.Sprintf("/buckets/%s", m.volumeID) + } + + collection := volumeContext["collection"] + if collection == "" { + collection = m.volumeID + } + + args := []string{ + "-logtostderr=true", + "mount", + "-dirAutoCreate=true", + "-umask=000", + fmt.Sprintf("-dir=%s", targetPath), + fmt.Sprintf("-localSocket=%s", localSocket), + fmt.Sprintf("-cacheDir=%s", cacheDir), + } + + if m.readOnly { + args = append(args, "-readOnly") + } + + argsMap := map[string]string{ + "collection": collection, + "filer": strings.Join(filers, ","), + "filer.path": path, + "cacheCapacityMB": strconv.Itoa(m.driver.CacheCapacityMB), + "concurrentWriters": strconv.Itoa(m.driver.ConcurrentWriters), + "map.uid": m.driver.UidMap, + "map.gid": m.driver.GidMap, + "disk": "", + "dataCenter": "", + "replication": "", + "ttl": "", + "chunkSizeLimitMB": "", + "volumeServerAccess": "", + "readRetryTime": "", + } + + dataLocality := m.driver.DataLocality + if contextLocality, ok := volumeContext["dataLocality"]; ok && contextLocality != "" { + if dl, ok := datalocality.FromString(contextLocality); ok { + dataLocality = dl + } else { + return nil, fmt.Errorf("invalid volumeContext dataLocality: %s", contextLocality) + } + } + + dataCenter := m.driver.DataCenter + if err := CheckDataLocality(&dataLocality, &dataCenter); err != nil { + return nil, err + } + + switch dataLocality { + case datalocality.Write_preferLocalDc: + argsMap["dataCenter"] = dataCenter + } + + parameterArgMap := map[string]string{ + "uidMap": "map.uid", + "gidMap": "map.gid", + "filerPath": "filer.path", + "diskType": "disk", + } + + ignoredArgs := map[string]struct{}{"dataLocality": {}} + + for key, value := range volumeContext { + if _, ignored := ignoredArgs[key]; ignored { + continue + } + if mapped, ok := parameterArgMap[key]; ok { + key = mapped + } + if _, ok := argsMap[key]; !ok { + glog.Warningf("VolumeContext '%s' ignored", key) + continue + } + if value != "" { + argsMap[key] = value + } + } + + for key, value := range argsMap { + if value == "" { + continue + } + args = append(args, fmt.Sprintf("-%s=%s", key, value)) + } + + return args, nil +} diff --git a/pkg/driver/volume.go b/pkg/driver/volume.go index 080595e..9e258a8 100644 --- a/pkg/driver/volume.go +++ b/pkg/driver/volume.go @@ -1,10 +1,15 @@ package driver import ( + "context" + "fmt" "os" "github.com/seaweedfs/seaweedfs-csi-driver/pkg/mountmanager" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "k8s.io/mount-utils" ) @@ -14,14 +19,18 @@ type Volume struct { mounter Mounter unmounter Unmounter - driver *SeaweedFsDriver + + // unix socket used to manage volume + localSocket string + driver *SeaweedFsDriver } func NewVolume(volumeID string, mounter Mounter, driver *SeaweedFsDriver) *Volume { return &Volume{ - VolumeId: volumeID, - mounter: mounter, - driver: driver, + VolumeId: volumeID, + mounter: mounter, + localSocket: mountmanager.LocalSocketPath(volumeID), + driver: driver, } } @@ -75,18 +84,22 @@ func (vol *Volume) Publish(stagingTargetPath string, targetPath string, readOnly } func (vol *Volume) Quota(sizeByte int64) error { - client, err := mountmanager.NewClient(vol.driver.mountEndpoint) + target := fmt.Sprintf("passthrough:///unix://%s", vol.localSocket) + dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + + clientConn, err := grpc.Dial(target, dialOption) if err != nil { return err } + defer clientConn.Close() // We can't create PV of zero size, so we're using quota of 1 byte to define no quota. if sizeByte == 1 { sizeByte = 0 } - _, err = client.Configure(&mountmanager.ConfigureRequest{ - VolumeID: vol.VolumeId, + client := mount_pb.NewSeaweedMountClient(clientConn) + _, err = client.Configure(context.Background(), &mount_pb.ConfigureRequest{ CollectionCapacity: sizeByte, }) return err diff --git a/pkg/mountmanager/client.go b/pkg/mountmanager/client.go index 8a53d00..6288c44 100644 --- a/pkg/mountmanager/client.go +++ b/pkg/mountmanager/client.go @@ -62,15 +62,6 @@ func (c *Client) Unmount(req *UnmountRequest) (*UnmountResponse, error) { return &resp, nil } -// Configure updates runtime options such as quota for an existing mount. -func (c *Client) Configure(req *ConfigureRequest) (*ConfigureResponse, error) { - var resp ConfigureResponse - if err := c.doPost("/configure", req, &resp); err != nil { - return nil, err - } - return &resp, nil -} - func (c *Client) doPost(path string, payload any, out any) error { body := &bytes.Buffer{} if err := json.NewEncoder(body).Encode(payload); err != nil { diff --git a/pkg/mountmanager/manager.go b/pkg/mountmanager/manager.go index 0553af5..26dda18 100644 --- a/pkg/mountmanager/manager.go +++ b/pkg/mountmanager/manager.go @@ -1,23 +1,16 @@ package mountmanager import ( - "context" "errors" "fmt" "os" "os/exec" - "path/filepath" - "strconv" "strings" "sync" "syscall" "time" - "github.com/seaweedfs/seaweedfs-csi-driver/pkg/datalocality" "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" "k8s.io/mount-utils" ) @@ -145,19 +138,22 @@ func (m *Manager) startMount(req *MountRequest) (*mountEntry, error) { return nil, err } - cacheBase := req.CacheDir - if cacheBase == "" { - cacheBase = os.TempDir() + cacheDir := req.CacheDir + if cacheDir == "" { + return nil, errors.New("cacheDir is required") } - cacheDir := filepath.Join(cacheBase, req.VolumeID) if err := os.MkdirAll(cacheDir, 0750); err != nil { return nil, fmt.Errorf("creating cache dir: %w", err) } - localSocket := LocalSocketPath(req.VolumeID) - args, err := buildMountArgs(req, targetPath, cacheDir, localSocket) - if err != nil { - return nil, err + localSocket := req.LocalSocket + if localSocket == "" { + return nil, errors.New("localSocket is required") + } + + args := req.MountArgs + if len(args) == 0 { + return nil, errors.New("mountArgs is required") } process, err := startWeedMountProcess(m.weedBinary, args, targetPath) @@ -203,156 +199,18 @@ func validateMountRequest(req *MountRequest) error { if req.TargetPath == "" { return errors.New("targetPath is required") } - if len(req.Filers) == 0 { - return errors.New("at least one filer is required") - } - return nil -} - -func buildMountArgs(req *MountRequest, targetPath, cacheDir, localSocket string) ([]string, error) { - volumeContext := req.VolumeContext - if volumeContext == nil { - volumeContext = map[string]string{} - } - - path := volumeContext["path"] - if path == "" { - path = fmt.Sprintf("/buckets/%s", req.VolumeID) - } - - collection := volumeContext["collection"] - if collection == "" { - collection = req.VolumeID - } - - args := []string{ - "-logtostderr=true", - "mount", - "-dirAutoCreate=true", - "-umask=000", - fmt.Sprintf("-dir=%s", targetPath), - fmt.Sprintf("-localSocket=%s", localSocket), - fmt.Sprintf("-cacheDir=%s", cacheDir), - } - - if req.ReadOnly { - args = append(args, "-readOnly") - } - - argsMap := map[string]string{ - "collection": collection, - "filer": strings.Join(req.Filers, ","), - "filer.path": path, - "cacheCapacityMB": strconv.Itoa(req.CacheCapacityMB), - "concurrentWriters": strconv.Itoa(req.ConcurrentWriters), - "map.uid": req.UidMap, - "map.gid": req.GidMap, - "disk": "", - "dataCenter": "", - "replication": "", - "ttl": "", - "chunkSizeLimitMB": "", - "volumeServerAccess": "", - "readRetryTime": "", + if req.CacheDir == "" { + return errors.New("cacheDir is required") } - - dataLocality := datalocality.None - if req.DataLocality != "" { - if dl, ok := datalocality.FromString(req.DataLocality); ok { - dataLocality = dl - } else { - return nil, fmt.Errorf("invalid dataLocality: %s", req.DataLocality) - } + if req.LocalSocket == "" { + return errors.New("localSocket is required") } - - if contextLocality, ok := volumeContext["dataLocality"]; ok { - if dl, ok := datalocality.FromString(contextLocality); ok { - dataLocality = dl - } else { - return nil, fmt.Errorf("invalid volumeContext dataLocality: %s", contextLocality) - } - } - - if err := checkDataLocality(&dataLocality, req.DataCenter); err != nil { - return nil, err - } - - switch dataLocality { - case datalocality.Write_preferLocalDc: - argsMap["dataCenter"] = req.DataCenter - } - - parameterArgMap := map[string]string{ - "uidMap": "map.uid", - "gidMap": "map.gid", - "filerPath": "filer.path", - "diskType": "disk", - } - - ignoredArgs := map[string]struct{}{"dataLocality": {}} - - for key, value := range volumeContext { - if _, ignored := ignoredArgs[key]; ignored { - continue - } - if mapped, ok := parameterArgMap[key]; ok { - key = mapped - } - if _, ok := argsMap[key]; !ok { - glog.Warningf("VolumeContext '%s' ignored", key) - continue - } - if value != "" { - argsMap[key] = value - } - } - - for key, value := range argsMap { - if value == "" { - continue - } - args = append(args, fmt.Sprintf("-%s=%s", key, value)) - } - - return args, nil -} - -func checkDataLocality(dataLocality *datalocality.DataLocality, dataCenter string) error { - if *dataLocality != datalocality.None && dataCenter == "" { - return fmt.Errorf("dataLocality set, but dataCenter is empty") + if len(req.MountArgs) == 0 { + return errors.New("mountArgs is required") } return nil } -// Configure updates mount-level settings, e.g. quota. Intended to be invoked by a trusted caller. -func (m *Manager) Configure(req *ConfigureRequest) (*ConfigureResponse, error) { - if req == nil { - return nil, errors.New("configure request is nil") - } - - lock := m.locks.get(req.VolumeID) - lock.Lock() - defer lock.Unlock() - - entry := m.getMount(req.VolumeID) - if entry == nil { - m.locks.delete(req.VolumeID) - return nil, fmt.Errorf("volume %s not mounted", req.VolumeID) - } - - client, err := newMountProcessClient(entry.localSocket) - if err != nil { - return nil, err - } - defer client.Close() - - if err := client.Configure(req.CollectionCapacity); err != nil { - return nil, err - } - - return &ConfigureResponse{}, nil -} - type mountEntry struct { volumeID string targetPath string @@ -454,37 +312,3 @@ func waitForMount(path string, timeout time.Duration) error { } } } - -type mountProcessClient struct { - conn *grpc.ClientConn - client mount_pb.SeaweedMountClient -} - -func newMountProcessClient(localSocket string) (*mountProcessClient, error) { - target := fmt.Sprintf("passthrough:///unix://%s", localSocket) - conn, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return nil, fmt.Errorf("dial mount socket %s: %w", localSocket, err) - } - - return &mountProcessClient{ - conn: conn, - client: mount_pb.NewSeaweedMountClient(conn), - }, nil -} - -func (c *mountProcessClient) Close() error { - return c.conn.Close() -} - -func (c *mountProcessClient) Configure(capacity int64) error { - if capacity == 1 { - capacity = 0 - } - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - _, err := c.client.Configure(ctx, &mount_pb.ConfigureRequest{CollectionCapacity: capacity}) - return err -} diff --git a/pkg/mountmanager/socket.go b/pkg/mountmanager/socket.go index bd63146..1b8a079 100644 --- a/pkg/mountmanager/socket.go +++ b/pkg/mountmanager/socket.go @@ -12,5 +12,5 @@ func LocalSocketPath(volumeID string) string { if hash < 0 { hash = -hash } - return fmt.Sprintf("/tmp/seaweedfs-mount-%d.sock", hash) + return fmt.Sprintf("/var/lib/seaweedfs-mount/seaweedfs-mount-%d.sock", hash) } diff --git a/pkg/mountmanager/types.go b/pkg/mountmanager/types.go index 37f791f..b15fe36 100644 --- a/pkg/mountmanager/types.go +++ b/pkg/mountmanager/types.go @@ -2,18 +2,11 @@ package mountmanager // MountRequest contains all information needed to start a weed mount process. type MountRequest struct { - VolumeID string `json:"volumeId"` - TargetPath string `json:"targetPath"` - ReadOnly bool `json:"readOnly"` - Filers []string `json:"filers"` - CacheDir string `json:"cacheDir"` - CacheCapacityMB int `json:"cacheCapacityMB"` - ConcurrentWriters int `json:"concurrentWriters"` - UidMap string `json:"uidMap"` - GidMap string `json:"gidMap"` - DataCenter string `json:"dataCenter"` - DataLocality string `json:"dataLocality"` - VolumeContext map[string]string `json:"volumeContext"` + VolumeID string `json:"volumeId"` + TargetPath string `json:"targetPath"` + CacheDir string `json:"cacheDir"` + MountArgs []string `json:"mountArgs"` + LocalSocket string `json:"localSocket"` } // MountResponse is returned after a successful mount request. @@ -29,15 +22,6 @@ type UnmountRequest struct { // UnmountResponse is the response of a successful unmount request. type UnmountResponse struct{} -// ConfigureRequest adjusts the behaviour of an existing mount. -type ConfigureRequest struct { - VolumeID string `json:"volumeId"` - CollectionCapacity int64 `json:"collectionCapacity"` -} - -// ConfigureResponse represents a successful configure call. -type ConfigureResponse struct{} - // ErrorResponse is returned when the mount service encounters a failure. type ErrorResponse struct { Error string `json:"error"` -- cgit v1.2.3