aboutsummaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
author泽淼 周 <zhouzemiao@ihuman.com>2025-09-26 19:54:44 +0800
committerChris Lu <chrislusf@users.noreply.github.com>2025-12-06 18:53:22 -0800
commit2828d5a05c36aa8719778142eb4472007906f14c (patch)
tree7a60daa0bc9cf86204ddeddc94bc7dc748483cb5 /cmd
parentfd2b35494095ccf7b06fb210305406f83ed17998 (diff)
downloadseaweedfs-csi-driver-2828d5a05c36aa8719778142eb4472007906f14c.tar.xz
seaweedfs-csi-driver-2828d5a05c36aa8719778142eb4472007906f14c.zip
feat: Separated weed mount lifecycle into a dedicated service and rewired the CSI components to call it.
Diffstat (limited to 'cmd')
-rw-r--r--cmd/seaweedfs-csi-driver/main.go3
-rw-r--r--cmd/seaweedfs-mount/Dockerfile20
-rw-r--r--cmd/seaweedfs-mount/main.go153
3 files changed, 175 insertions, 1 deletions
diff --git a/cmd/seaweedfs-csi-driver/main.go b/cmd/seaweedfs-csi-driver/main.go
index f749db6..45bb810 100644
--- a/cmd/seaweedfs-csi-driver/main.go
+++ b/cmd/seaweedfs-csi-driver/main.go
@@ -19,6 +19,7 @@ var (
filer = flag.String("filer", "localhost:8888", "filer server")
endpoint = flag.String("endpoint", "unix://tmp/seaweedfs-csi.sock", "CSI endpoint to accept gRPC calls")
+ mountEndpoint = flag.String("mountEndpoint", "unix:///tmp/seaweedfs-mount.sock", "mount service endpoint")
nodeID = flag.String("nodeid", "", "node id")
version = flag.Bool("version", false, "Print the version and exit.")
concurrentWriters = flag.Int("concurrentWriters", 32, "limit concurrent goroutine writers if not 0")
@@ -78,7 +79,7 @@ func main() {
glog.Infof("connect to filer %s", *filer)
- drv := driver.NewSeaweedFsDriver(*driverName, *filer, *nodeID, *endpoint, *enableAttacher)
+ drv := driver.NewSeaweedFsDriver(*driverName, *filer, *nodeID, *endpoint, *mountEndpoint, *enableAttacher)
drv.RunNode = runNode
drv.RunController = runController
diff --git a/cmd/seaweedfs-mount/Dockerfile b/cmd/seaweedfs-mount/Dockerfile
new file mode 100644
index 0000000..689037b
--- /dev/null
+++ b/cmd/seaweedfs-mount/Dockerfile
@@ -0,0 +1,20 @@
+FROM golang:1.23-alpine AS builder
+
+RUN apk add git g++
+
+RUN mkdir -p /go/src/github.com/seaweedfs/
+RUN git clone https://github.com/seaweedfs/seaweedfs /go/src/github.com/seaweedfs/seaweedfs
+RUN cd /go/src/github.com/seaweedfs/seaweedfs/weed && go install
+
+RUN mkdir -p /go/src/github.com/zemul/
+RUN git clone https://github.com/zemul/seaweedfs-csi-driver /go/src/github.com/zemul/seaweedfs-csi-driver
+RUN cd /go/src/github.com/zemul/seaweedfs-csi-driver && \
+ go build -ldflags="-s -w" -o /seaweedfs-mount ./cmd/seaweedfs-mount/main.go
+
+FROM alpine AS final
+RUN apk add fuse
+COPY --from=builder /go/bin/weed /usr/bin/
+COPY --from=builder /seaweedfs-mount /
+
+RUN chmod +x /seaweedfs-mount
+ENTRYPOINT ["/seaweedfs-mount"] \ No newline at end of file
diff --git a/cmd/seaweedfs-mount/main.go b/cmd/seaweedfs-mount/main.go
new file mode 100644
index 0000000..174b530
--- /dev/null
+++ b/cmd/seaweedfs-mount/main.go
@@ -0,0 +1,153 @@
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "flag"
+ "net"
+ "net/http"
+ "os"
+ "os/signal"
+ "syscall"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs-csi-driver/pkg/mountmanager"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+)
+
+var (
+ endpoint = flag.String("endpoint", "unix:///tmp/seaweedfs-mount.sock", "endpoint the mount service listens on")
+ weedBinary = flag.String("weedBinary", mountmanager.DefaultWeedBinary, "path to the weed binary")
+)
+
+func main() {
+ flag.Parse()
+
+ scheme, address, err := mountmanager.ParseEndpoint(*endpoint)
+ if err != nil {
+ glog.Fatalf("invalid endpoint: %v", err)
+ }
+ if scheme != "unix" {
+ glog.Fatalf("unsupported endpoint scheme: %s", scheme)
+ }
+
+ if err := os.Remove(address); err != nil && !errors.Is(err, os.ErrNotExist) {
+ glog.Fatalf("removing existing socket: %v", err)
+ }
+
+ listener, err := net.Listen("unix", address)
+ if err != nil {
+ glog.Fatalf("failed to listen on %s: %v", address, err)
+ }
+ defer func() {
+ _ = listener.Close()
+ _ = os.Remove(address)
+ }()
+
+ manager := mountmanager.NewManager(mountmanager.Config{WeedBinary: *weedBinary})
+
+ mux := http.NewServeMux()
+ mux.HandleFunc("/mount", func(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodPost {
+ writeError(w, http.StatusMethodNotAllowed, "method not allowed")
+ return
+ }
+
+ var req mountmanager.MountRequest
+ if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
+ writeError(w, http.StatusBadRequest, "invalid request: "+err.Error())
+ return
+ }
+
+ resp, err := manager.Mount(&req)
+ if err != nil {
+ writeError(w, http.StatusBadRequest, err.Error())
+ return
+ }
+
+ writeJSON(w, http.StatusOK, resp)
+ })
+
+ mux.HandleFunc("/unmount", func(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodPost {
+ writeError(w, http.StatusMethodNotAllowed, "method not allowed")
+ return
+ }
+
+ var req mountmanager.UnmountRequest
+ if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
+ writeError(w, http.StatusBadRequest, "invalid request: "+err.Error())
+ return
+ }
+
+ resp, err := manager.Unmount(&req)
+ if err != nil {
+ writeError(w, http.StatusBadRequest, err.Error())
+ return
+ }
+
+ writeJSON(w, http.StatusOK, resp)
+ })
+
+ mux.HandleFunc("/configure", func(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodPost {
+ writeError(w, http.StatusMethodNotAllowed, "method not allowed")
+ return
+ }
+
+ var req mountmanager.ConfigureRequest
+ if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
+ writeError(w, http.StatusBadRequest, "invalid request: "+err.Error())
+ return
+ }
+
+ resp, err := manager.Configure(&req)
+ if err != nil {
+ writeError(w, http.StatusBadRequest, err.Error())
+ return
+ }
+
+ writeJSON(w, http.StatusOK, resp)
+ })
+
+ mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(http.StatusOK)
+ _, _ = w.Write([]byte("ok"))
+ })
+
+ server := &http.Server{Handler: mux}
+
+ ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
+ defer stop()
+
+ go func() {
+ if err := server.Serve(listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
+ glog.Fatalf("server error: %v", err)
+ }
+ }()
+
+ glog.Infof("mount service listening on %s", *endpoint)
+
+ <-ctx.Done()
+
+ shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if err := server.Shutdown(shutdownCtx); err != nil {
+ glog.Errorf("server shutdown error: %v", err)
+ }
+
+ glog.Infof("mount service stopped")
+}
+
+func writeJSON(w http.ResponseWriter, status int, data interface{}) {
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(status)
+ if err := json.NewEncoder(w).Encode(data); err != nil {
+ glog.Errorf("writing response failed: %v", err)
+ }
+}
+
+func writeError(w http.ResponseWriter, status int, message string) {
+ writeJSON(w, status, mountmanager.ErrorResponse{Error: message})
+}