diff options
| author | 泽淼 周 <zhouzemiao@ihuman.com> | 2025-09-27 18:28:34 +0800 |
|---|---|---|
| committer | Chris Lu <chrislusf@users.noreply.github.com> | 2025-12-06 18:53:22 -0800 |
| commit | da9e11f28bab3c90107c3a730ddbfb57d18475ca (patch) | |
| tree | 10b98b83c5b551bf7cccf71edb4b41c6fb40228d /pkg/mountmanager/manager.go | |
| parent | 1a36b03ed79a12e9b9db5954167ce825222f3d83 (diff) | |
| download | seaweedfs-csi-driver-da9e11f28bab3c90107c3a730ddbfb57d18475ca.tar.xz seaweedfs-csi-driver-da9e11f28bab3c90107c3a730ddbfb57d18475ca.zip | |
Optimization: Reduce unnecessary logic of seaweedfs-mount
Diffstat (limited to 'pkg/mountmanager/manager.go')
| -rw-r--r-- | pkg/mountmanager/manager.go | 210 |
1 files changed, 17 insertions, 193 deletions
diff --git a/pkg/mountmanager/manager.go b/pkg/mountmanager/manager.go index 0553af5..26dda18 100644 --- a/pkg/mountmanager/manager.go +++ b/pkg/mountmanager/manager.go @@ -1,23 +1,16 @@ package mountmanager import ( - "context" "errors" "fmt" "os" "os/exec" - "path/filepath" - "strconv" "strings" "sync" "syscall" "time" - "github.com/seaweedfs/seaweedfs-csi-driver/pkg/datalocality" "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" "k8s.io/mount-utils" ) @@ -145,19 +138,22 @@ func (m *Manager) startMount(req *MountRequest) (*mountEntry, error) { return nil, err } - cacheBase := req.CacheDir - if cacheBase == "" { - cacheBase = os.TempDir() + cacheDir := req.CacheDir + if cacheDir == "" { + return nil, errors.New("cacheDir is required") } - cacheDir := filepath.Join(cacheBase, req.VolumeID) if err := os.MkdirAll(cacheDir, 0750); err != nil { return nil, fmt.Errorf("creating cache dir: %w", err) } - localSocket := LocalSocketPath(req.VolumeID) - args, err := buildMountArgs(req, targetPath, cacheDir, localSocket) - if err != nil { - return nil, err + localSocket := req.LocalSocket + if localSocket == "" { + return nil, errors.New("localSocket is required") + } + + args := req.MountArgs + if len(args) == 0 { + return nil, errors.New("mountArgs is required") } process, err := startWeedMountProcess(m.weedBinary, args, targetPath) @@ -203,156 +199,18 @@ func validateMountRequest(req *MountRequest) error { if req.TargetPath == "" { return errors.New("targetPath is required") } - if len(req.Filers) == 0 { - return errors.New("at least one filer is required") - } - return nil -} - -func buildMountArgs(req *MountRequest, targetPath, cacheDir, localSocket string) ([]string, error) { - volumeContext := req.VolumeContext - if volumeContext == nil { - volumeContext = map[string]string{} - } - - path := volumeContext["path"] - if path == "" { - path = fmt.Sprintf("/buckets/%s", req.VolumeID) - } - - collection := volumeContext["collection"] - if collection == "" { - collection = req.VolumeID - } - - args := []string{ - "-logtostderr=true", - "mount", - "-dirAutoCreate=true", - "-umask=000", - fmt.Sprintf("-dir=%s", targetPath), - fmt.Sprintf("-localSocket=%s", localSocket), - fmt.Sprintf("-cacheDir=%s", cacheDir), - } - - if req.ReadOnly { - args = append(args, "-readOnly") - } - - argsMap := map[string]string{ - "collection": collection, - "filer": strings.Join(req.Filers, ","), - "filer.path": path, - "cacheCapacityMB": strconv.Itoa(req.CacheCapacityMB), - "concurrentWriters": strconv.Itoa(req.ConcurrentWriters), - "map.uid": req.UidMap, - "map.gid": req.GidMap, - "disk": "", - "dataCenter": "", - "replication": "", - "ttl": "", - "chunkSizeLimitMB": "", - "volumeServerAccess": "", - "readRetryTime": "", + if req.CacheDir == "" { + return errors.New("cacheDir is required") } - - dataLocality := datalocality.None - if req.DataLocality != "" { - if dl, ok := datalocality.FromString(req.DataLocality); ok { - dataLocality = dl - } else { - return nil, fmt.Errorf("invalid dataLocality: %s", req.DataLocality) - } + if req.LocalSocket == "" { + return errors.New("localSocket is required") } - - if contextLocality, ok := volumeContext["dataLocality"]; ok { - if dl, ok := datalocality.FromString(contextLocality); ok { - dataLocality = dl - } else { - return nil, fmt.Errorf("invalid volumeContext dataLocality: %s", contextLocality) - } - } - - if err := checkDataLocality(&dataLocality, req.DataCenter); err != nil { - return nil, err - } - - switch dataLocality { - case datalocality.Write_preferLocalDc: - argsMap["dataCenter"] = req.DataCenter - } - - parameterArgMap := map[string]string{ - "uidMap": "map.uid", - "gidMap": "map.gid", - "filerPath": "filer.path", - "diskType": "disk", - } - - ignoredArgs := map[string]struct{}{"dataLocality": {}} - - for key, value := range volumeContext { - if _, ignored := ignoredArgs[key]; ignored { - continue - } - if mapped, ok := parameterArgMap[key]; ok { - key = mapped - } - if _, ok := argsMap[key]; !ok { - glog.Warningf("VolumeContext '%s' ignored", key) - continue - } - if value != "" { - argsMap[key] = value - } - } - - for key, value := range argsMap { - if value == "" { - continue - } - args = append(args, fmt.Sprintf("-%s=%s", key, value)) - } - - return args, nil -} - -func checkDataLocality(dataLocality *datalocality.DataLocality, dataCenter string) error { - if *dataLocality != datalocality.None && dataCenter == "" { - return fmt.Errorf("dataLocality set, but dataCenter is empty") + if len(req.MountArgs) == 0 { + return errors.New("mountArgs is required") } return nil } -// Configure updates mount-level settings, e.g. quota. Intended to be invoked by a trusted caller. -func (m *Manager) Configure(req *ConfigureRequest) (*ConfigureResponse, error) { - if req == nil { - return nil, errors.New("configure request is nil") - } - - lock := m.locks.get(req.VolumeID) - lock.Lock() - defer lock.Unlock() - - entry := m.getMount(req.VolumeID) - if entry == nil { - m.locks.delete(req.VolumeID) - return nil, fmt.Errorf("volume %s not mounted", req.VolumeID) - } - - client, err := newMountProcessClient(entry.localSocket) - if err != nil { - return nil, err - } - defer client.Close() - - if err := client.Configure(req.CollectionCapacity); err != nil { - return nil, err - } - - return &ConfigureResponse{}, nil -} - type mountEntry struct { volumeID string targetPath string @@ -454,37 +312,3 @@ func waitForMount(path string, timeout time.Duration) error { } } } - -type mountProcessClient struct { - conn *grpc.ClientConn - client mount_pb.SeaweedMountClient -} - -func newMountProcessClient(localSocket string) (*mountProcessClient, error) { - target := fmt.Sprintf("passthrough:///unix://%s", localSocket) - conn, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return nil, fmt.Errorf("dial mount socket %s: %w", localSocket, err) - } - - return &mountProcessClient{ - conn: conn, - client: mount_pb.NewSeaweedMountClient(conn), - }, nil -} - -func (c *mountProcessClient) Close() error { - return c.conn.Close() -} - -func (c *mountProcessClient) Configure(capacity int64) error { - if capacity == 1 { - capacity = 0 - } - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - _, err := c.client.Configure(ctx, &mount_pb.ConfigureRequest{CollectionCapacity: capacity}) - return err -} |
