From 2828d5a05c36aa8719778142eb4472007906f14c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B3=BD=E6=B7=BC=20=E5=91=A8?= Date: Fri, 26 Sep 2025 19:54:44 +0800 Subject: feat: Separated weed mount lifecycle into a dedicated service and rewired the CSI components to call it. --- pkg/mountmanager/client.go | 110 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 pkg/mountmanager/client.go (limited to 'pkg/mountmanager/client.go') diff --git a/pkg/mountmanager/client.go b/pkg/mountmanager/client.go new file mode 100644 index 0000000..8a53d00 --- /dev/null +++ b/pkg/mountmanager/client.go @@ -0,0 +1,110 @@ +package mountmanager + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "net/http" + "time" +) + +// Client talks to the mount service over a Unix domain socket. +type Client struct { + httpClient *http.Client + baseURL string +} + +// NewClient builds a new Client for the given endpoint. +func NewClient(endpoint string) (*Client, error) { + scheme, address, err := ParseEndpoint(endpoint) + if err != nil { + return nil, err + } + if scheme != "unix" { + return nil, fmt.Errorf("unsupported endpoint scheme: %s", scheme) + } + + dialer := &net.Dialer{} + transport := &http.Transport{ + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + return dialer.DialContext(ctx, "unix", address) + }, + } + + return &Client{ + httpClient: &http.Client{ + Timeout: 30 * time.Second, + Transport: transport, + }, + baseURL: "http://unix", + }, nil +} + +// Mount mounts a volume using the mount service. +func (c *Client) Mount(req *MountRequest) (*MountResponse, error) { + var resp MountResponse + if err := c.doPost("/mount", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +// Unmount unmounts a volume using the mount service. +func (c *Client) Unmount(req *UnmountRequest) (*UnmountResponse, error) { + var resp UnmountResponse + if err := c.doPost("/unmount", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +// Configure updates runtime options such as quota for an existing mount. +func (c *Client) Configure(req *ConfigureRequest) (*ConfigureResponse, error) { + var resp ConfigureResponse + if err := c.doPost("/configure", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *Client) doPost(path string, payload any, out any) error { + body := &bytes.Buffer{} + if err := json.NewEncoder(body).Encode(payload); err != nil { + return fmt.Errorf("encode request: %w", err) + } + + req, err := http.NewRequest(http.MethodPost, c.baseURL+path, body) + if err != nil { + return fmt.Errorf("build request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("call mount service: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + var errResp ErrorResponse + if err := json.NewDecoder(resp.Body).Decode(&errResp); err == nil && errResp.Error != "" { + return errors.New(errResp.Error) + } + data, _ := io.ReadAll(resp.Body) + return fmt.Errorf("mount service error: %s (%s)", resp.Status, string(data)) + } + + if out == nil { + _, _ = io.Copy(io.Discard, resp.Body) + return nil + } + + if err := json.NewDecoder(resp.Body).Decode(out); err != nil { + return fmt.Errorf("decode response: %w", err) + } + return nil +} -- cgit v1.2.3