aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/seaweedfs-csi-driver/main.go9
-rw-r--r--deploy/kubernetes/provisioner.yaml102
-rw-r--r--pkg/driver/controllerserver.go9
-rw-r--r--pkg/driver/driver.go40
-rw-r--r--pkg/driver/mounter.go30
-rw-r--r--pkg/driver/mounter_seaweedfs.go4
-rw-r--r--pkg/driver/nodeserver.go3
7 files changed, 149 insertions, 48 deletions
diff --git a/cmd/seaweedfs-csi-driver/main.go b/cmd/seaweedfs-csi-driver/main.go
index 559bad0..a654992 100644
--- a/cmd/seaweedfs-csi-driver/main.go
+++ b/cmd/seaweedfs-csi-driver/main.go
@@ -10,9 +10,10 @@ import (
)
var (
- endpoint = flag.String("endpoint", "unix://tmp/seaweedfs-csi.sock", "CSI endpoint to accept gRPC calls")
- nodeID = flag.String("nodeid", "", "node id")
- version = flag.Bool("version", false, "Print the version and exit.")
+ filer = flag.String("filer", "localhost:8888", "filer server")
+ endpoint = flag.String("endpoint", "unix://tmp/seaweedfs-csi.sock", "CSI endpoint to accept gRPC calls")
+ nodeID = flag.String("nodeid", "", "node id")
+ version = flag.Bool("version", false, "Print the version and exit.")
)
func main() {
@@ -28,6 +29,6 @@ func main() {
os.Exit(0)
}
- drv := driver.NewSeaweedFsDriver(*nodeID, *endpoint)
+ drv := driver.NewSeaweedFsDriver(*filer, *nodeID, *endpoint)
drv.Run()
}
diff --git a/deploy/kubernetes/provisioner.yaml b/deploy/kubernetes/provisioner.yaml
new file mode 100644
index 0000000..226257e
--- /dev/null
+++ b/deploy/kubernetes/provisioner.yaml
@@ -0,0 +1,102 @@
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+ name: csi-provisioner-sa
+ namespace: default
+---
+kind: ClusterRole
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: external-provisioner-runner
+rules:
+ - apiGroups: [""]
+ resources: ["secrets"]
+ verbs: ["get", "list"]
+ - apiGroups: [""]
+ resources: ["persistentvolumes"]
+ verbs: ["get", "list", "watch", "create", "delete"]
+ - apiGroups: [""]
+ resources: ["persistentvolumeclaims"]
+ verbs: ["get", "list", "watch", "update"]
+ - apiGroups: ["storage.k8s.io"]
+ resources: ["storageclasses"]
+ verbs: ["get", "list", "watch"]
+ - apiGroups: [""]
+ resources: ["events"]
+ verbs: ["list", "watch", "create", "update", "patch"]
+---
+kind: ClusterRoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: csi-provisioner-role
+subjects:
+ - kind: ServiceAccount
+ name: csi-provisioner-sa
+ namespace: default
+roleRef:
+ kind: ClusterRole
+ name: external-provisioner-runner
+ apiGroup: rbac.authorization.k8s.io
+---
+kind: Service
+apiVersion: v1
+metadata:
+ name: csi-provisioner-seaweedfs
+ namespace: default
+ labels:
+ app: csi-provisioner-seaweedfs
+spec:
+ selector:
+ app: csi-provisioner-seaweedfs
+ ports:
+ - name: dummy
+ port: 12345
+---
+kind: StatefulSet
+apiVersion: apps/v1beta1
+metadata:
+ name: csi-provisioner-seaweedfs
+ namespace: default
+spec:
+ serviceName: "csi-provisioner-seaweedfs"
+ replicas: 1
+ template:
+ metadata:
+ labels:
+ app: csi-provisioner-seaweedfs
+ spec:
+ serviceAccount: csi-provisioner-sa
+ containers:
+ - name: csi-provisioner
+ image: quay.io/k8scsi/csi-provisioner:v1.1.0
+ args:
+ - "--provisioner=seaweedfs-csi-driver"
+ - "--csi-address=$(ADDRESS)"
+ - "--v=4"
+ env:
+ - name: ADDRESS
+ value: /var/lib/kubelet/plugins/seaweedfs-csi-driver/csi.sock
+ imagePullPolicy: "IfNotPresent"
+ volumeMounts:
+ - name: socket-dir
+ mountPath: /var/lib/kubelet/plugins/seaweedfs-csi-driver
+ - name: csi-seaweedfs
+ image: seaweedfs/csi:v1.1.1
+ args:
+ - "--endpoint=$(CSI_ENDPOINT)"
+ - "--nodeid=$(NODE_ID)"
+ - "--v=4"
+ env:
+ - name: CSI_ENDPOINT
+ value: unix:///var/lib/kubelet/plugins/seaweedfs-csi-driver/csi.sock
+ - name: NODE_ID
+ valueFrom:
+ fieldRef:
+ fieldPath: spec.nodeName
+ imagePullPolicy: "Always"
+ volumeMounts:
+ - name: socket-dir
+ mountPath: /var/lib/kubelet/plugins/seaweedfs-csi-driver
+ volumes:
+ - name: socket-dir
+ emptyDir: {}
diff --git a/pkg/driver/controllerserver.go b/pkg/driver/controllerserver.go
index c0eeed3..78466e2 100644
--- a/pkg/driver/controllerserver.go
+++ b/pkg/driver/controllerserver.go
@@ -50,8 +50,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
seaweedFsVolumeCount = 1
}
- cfg := newConfigFromSecrets(req.GetSecrets())
- if err := filer_pb.Mkdir(cfg, "/buckets", volumeId, nil); err != nil {
+ if err := filer_pb.Mkdir(cs.Driver, "/buckets", volumeId, nil); err != nil {
return nil, fmt.Errorf("Error setting bucket metadata: %v", err)
}
@@ -81,8 +80,7 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
}
glog.V(4).Infof("Deleting volume %s", volumeId)
- cfg := newConfigFromSecrets(req.GetSecrets())
- if err := filer_pb.Remove(cfg, "/buckets", volumeId, true, true, true); err != nil {
+ if err := filer_pb.Remove(cs.Driver, "/buckets", volumeId, true, true, true); err != nil {
return nil, fmt.Errorf("Error setting bucket metadata: %v", err)
}
@@ -107,8 +105,7 @@ func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req
return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request")
}
- cfg := newConfigFromSecrets(req.GetSecrets())
- exists, err := filer_pb.Exists(cfg, "/buckets", req.GetVolumeId(), true)
+ exists, err := filer_pb.Exists(cs.Driver, "/buckets", req.GetVolumeId(), true)
if err != nil {
return nil, fmt.Errorf("Error checking bucket %s exists: %v", req.GetVolumeId(), err)
}
diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go
index df46223..1261a86 100644
--- a/pkg/driver/driver.go
+++ b/pkg/driver/driver.go
@@ -4,12 +4,17 @@ import (
"fmt"
"os"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
+ "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
- "k8s.io/klog"
"k8s.io/client-go/rest"
+ "k8s.io/klog"
)
const (
@@ -30,17 +35,21 @@ type SeaweedFsDriver struct {
vcap []*csi.VolumeCapability_AccessMode
cscap []*csi.ControllerServiceCapability
+ filer string
+ grpcDialOption grpc.DialOption
}
-func NewSeaweedFsDriver(nodeID, endpoint string) *SeaweedFsDriver {
+func NewSeaweedFsDriver(filer, nodeID, endpoint string) *SeaweedFsDriver {
glog.Infof("Driver: %v version: %v", driverName, version)
n := &SeaweedFsDriver{
- endpoint: endpoint,
- nodeID: nodeID,
- name: driverName,
- version: version,
+ endpoint: endpoint,
+ nodeID: nodeID,
+ name: driverName,
+ version: version,
+ filer: filer,
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"),
}
n.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{
@@ -106,3 +115,22 @@ func (d *SeaweedFsDriver) ValidateControllerServiceRequest(c csi.ControllerServi
}
return status.Error(codes.InvalidArgument, fmt.Sprintf("%s", c))
}
+
+var _ = filer_pb.FilerClient(&SeaweedFsDriver{})
+
+func (d *SeaweedFsDriver) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ filerGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(d.filer)
+ if parseErr != nil {
+ return fmt.Errorf("failed to parse filer %v: %v", filerGrpcAddress, parseErr)
+ }
+
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, filerGrpcAddress, d.grpcDialOption)
+
+}
+func (d *SeaweedFsDriver) AdjustedUrl(hostAndPort string) string {
+ return hostAndPort
+}
diff --git a/pkg/driver/mounter.go b/pkg/driver/mounter.go
index e94544d..38dd1de 100644
--- a/pkg/driver/mounter.go
+++ b/pkg/driver/mounter.go
@@ -4,12 +4,7 @@ import (
"fmt"
"time"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/glog"
- "google.golang.org/grpc"
"os/exec"
"k8s.io/utils/mount"
)
@@ -24,8 +19,8 @@ type Mounter interface {
Mount(target string) error
}
-func newMounter(bucketName string, cfg *Config) (Mounter, error) {
- return newSeaweedFsMounter(bucketName, cfg)
+func newMounter(bucketName string, filer string) (Mounter, error) {
+ return newSeaweedFsMounter(bucketName, filer)
}
func fuseMount(path string, command string, args []string) error {
@@ -64,24 +59,3 @@ func newConfigFromSecrets(secrets map[string]string) *Config {
}
return t
}
-
-var _ = filer_pb.FilerClient(&Config{})
-
-func (cfg *Config) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
-
- filerGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(cfg.Filer)
- if parseErr != nil {
- return fmt.Errorf("failed to parse filer %v: %v", filerGrpcAddress, parseErr)
- }
-
- grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
-
- return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
- return fn(client)
- }, filerGrpcAddress, grpcDialOption)
-
-}
-func (cfg *Config) AdjustedUrl(hostAndPort string) string {
- return hostAndPort
-}
diff --git a/pkg/driver/mounter_seaweedfs.go b/pkg/driver/mounter_seaweedfs.go
index 12fa1e7..4d448de 100644
--- a/pkg/driver/mounter_seaweedfs.go
+++ b/pkg/driver/mounter_seaweedfs.go
@@ -14,10 +14,10 @@ const (
seaweedFsCmd = "weed"
)
-func newSeaweedFsMounter(bucketName string, cfg *Config) (Mounter, error) {
+func newSeaweedFsMounter(bucketName string, filer string) (Mounter, error) {
return &seaweedFsMounter{
bucketName: bucketName,
- filerUrl: cfg.Filer,
+ filerUrl: filer,
}, nil
}
diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go
index faf35ba..b58b23b 100644
--- a/pkg/driver/nodeserver.go
+++ b/pkg/driver/nodeserver.go
@@ -50,8 +50,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
mo = append(mo, "ro")
}
- cfg := newConfigFromSecrets(req.GetSecrets())
- mounter, err := newMounter(volumeID, cfg)
+ mounter, err := newMounter(volumeID, ns.Driver.filer)
if err != nil {
return nil, err
}