aboutsummaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-03-19 02:59:45 -0700
committerChris Lu <chris.lu@gmail.com>2020-03-19 02:59:45 -0700
commitb21fb2e2b62dd9ec3cf13403e87687229424f1e0 (patch)
tree9b2d9987e9c431cb52d5b13369928fb128960cb3 /pkg
downloadseaweedfs-csi-driver-b21fb2e2b62dd9ec3cf13403e87687229424f1e0.tar.xz
seaweedfs-csi-driver-b21fb2e2b62dd9ec3cf13403e87687229424f1e0.zip
WIP
Diffstat (limited to 'pkg')
-rw-r--r--pkg/driver/controllerserver.go104
-rw-r--r--pkg/driver/driver.go102
-rw-r--r--pkg/driver/identityserver.go49
-rw-r--r--pkg/driver/nodeserver.go105
-rw-r--r--pkg/driver/server.go96
-rw-r--r--pkg/driver/utils.go52
-rw-r--r--pkg/driver/version.go53
7 files changed, 561 insertions, 0 deletions
diff --git a/pkg/driver/controllerserver.go b/pkg/driver/controllerserver.go
new file mode 100644
index 0000000..e176866
--- /dev/null
+++ b/pkg/driver/controllerserver.go
@@ -0,0 +1,104 @@
+package driver
+
+import (
+ "context"
+
+ "github.com/container-storage-interface/spec/lib/go/csi"
+ "github.com/golang/glog"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+type ControllerServer struct {
+ Driver *SeaweedFsDriver
+}
+
+var _ = csi.ControllerServer(&ControllerServer{})
+
+func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
+
+ volumeId := req.GetName()
+ params := req.GetParameters()
+ glog.V(4).Info("params:%v", params)
+ capacity := req.GetCapacityRange().GetRequiredBytes()
+ capacityGB := capacity >> 30
+ if capacityGB == 0 {
+ return nil, status.Error(codes.InvalidArgument, "required bytes less than 1GB")
+ }
+ seaweedFsVolumeCount := capacityGB / 30
+ if seaweedFsVolumeCount == 0 {
+ seaweedFsVolumeCount = 1
+ }
+
+ err := cs.Driver.createBucket(volumeId, int(seaweedFsVolumeCount))
+
+ return &csi.CreateVolumeResponse{
+ Volume: &csi.Volume{
+ VolumeId: volumeId,
+ CapacityBytes: 0, // seaweedFsVolumeCount * 1024 * 1024 * 30,
+ VolumeContext: params,
+ },
+ }, err
+}
+
+func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
+
+ volumeId := req.VolumeId
+ err := cs.Driver.deleteBucket(volumeId)
+
+ return &csi.DeleteVolumeResponse{}, err
+}
+
+func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
+ return nil, status.Error(codes.Unimplemented, "")
+}
+
+func (cs *ControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
+ return nil, status.Error(codes.Unimplemented, "")
+}
+
+func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
+
+ return &csi.ValidateVolumeCapabilitiesResponse{
+ Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
+ VolumeContext: req.GetVolumeContext(),
+ VolumeCapabilities: req.GetVolumeCapabilities(),
+ Parameters: req.GetParameters(),
+ },
+ }, nil
+
+}
+
+func (cs *ControllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
+ return nil, status.Error(codes.Unimplemented, "")
+}
+
+func (cs *ControllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
+ return nil, status.Error(codes.Unimplemented, "")
+}
+
+// ControllerGetCapabilities implements the default GRPC callout.
+// Default supports all capabilities
+func (cs *ControllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
+ glog.V(5).Infof("Using default ControllerGetCapabilities")
+
+ return &csi.ControllerGetCapabilitiesResponse{
+ Capabilities: cs.Driver.cscap,
+ }, nil
+}
+
+func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
+ return nil, status.Error(codes.Unimplemented, "")
+}
+
+func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
+ return nil, status.Error(codes.Unimplemented, "")
+}
+
+func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
+ return nil, status.Error(codes.Unimplemented, "")
+}
+
+func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
+ return nil, status.Error(codes.Unimplemented, "")
+}
diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go
new file mode 100644
index 0000000..d090613
--- /dev/null
+++ b/pkg/driver/driver.go
@@ -0,0 +1,102 @@
+package driver
+
+import (
+ "github.com/container-storage-interface/spec/lib/go/csi"
+ "github.com/golang/glog"
+)
+
+const (
+ driverName = "csi.seaweedfs.com"
+)
+
+var (
+ version = "1.0.0-rc1"
+)
+
+type SeaweedFsDriver struct {
+ name string
+ nodeID string
+ version string
+
+ endpoint string
+
+ vcap []*csi.VolumeCapability_AccessMode
+ cscap []*csi.ControllerServiceCapability
+
+ filer string
+ pathOnFiler string
+}
+
+func NewSeaweedFsDriver(nodeID, endpoint string) *SeaweedFsDriver {
+
+ glog.Infof("Driver: %v version: %v", driverName, version)
+
+ n := &SeaweedFsDriver{
+ endpoint: endpoint,
+ nodeID: nodeID,
+ name: driverName,
+ version: version,
+ }
+
+ n.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{
+ csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
+ })
+ n.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{
+ csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
+ })
+
+ return n
+}
+
+func NewNodeServer(n *SeaweedFsDriver) *NodeServer {
+
+ return &NodeServer{
+ Driver: n,
+ }
+}
+
+func (n *SeaweedFsDriver) Run() {
+ s := NewNonBlockingGRPCServer()
+ s.Start(n.endpoint,
+ NewIdentityServer(n),
+ NewControllerServer(n),
+ NewNodeServer(n))
+ s.Wait()
+}
+
+func (n *SeaweedFsDriver) AddVolumeCapabilityAccessModes(vc []csi.VolumeCapability_AccessMode_Mode) []*csi.VolumeCapability_AccessMode {
+ var vca []*csi.VolumeCapability_AccessMode
+ for _, c := range vc {
+ glog.Infof("Enabling volume access mode: %v", c.String())
+ vca = append(vca, &csi.VolumeCapability_AccessMode{Mode: c})
+ }
+ n.vcap = vca
+ return vca
+}
+
+func (n *SeaweedFsDriver) AddControllerServiceCapabilities(cl []csi.ControllerServiceCapability_RPC_Type) {
+ var csc []*csi.ControllerServiceCapability
+
+ for _, c := range cl {
+ glog.Infof("Enabling controller service capability: %v", c.String())
+ csc = append(csc, NewControllerServiceCapability(c))
+ }
+
+ n.cscap = csc
+
+ return
+}
+
+func (n *SeaweedFsDriver) createBucket(volumeId string, seaweedFsVolumeCount int) error {
+ // TODO implement seaweedFsVolumeCount later
+ return nil
+}
+func (n *SeaweedFsDriver) deleteBucket(volumeId string) error {
+ return nil
+}
+func (n *SeaweedFsDriver) mount(source string, targetPath string) error {
+ return nil
+}
+func (n *SeaweedFsDriver) unmount(targetPath string) error {
+ return nil
+}
diff --git a/pkg/driver/identityserver.go b/pkg/driver/identityserver.go
new file mode 100644
index 0000000..f92b225
--- /dev/null
+++ b/pkg/driver/identityserver.go
@@ -0,0 +1,49 @@
+package driver
+
+import (
+ "github.com/container-storage-interface/spec/lib/go/csi"
+ "github.com/golang/glog"
+ "golang.org/x/net/context"
+)
+
+type IdentityServer struct {
+ Driver *SeaweedFsDriver
+}
+
+var _ = csi.IdentityServer(&IdentityServer{})
+
+func (ids *IdentityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
+
+ return &csi.GetPluginInfoResponse{
+ Name: ids.Driver.name,
+ VendorVersion: ids.Driver.version,
+ }, nil
+}
+
+func (ids *IdentityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
+ return &csi.ProbeResponse{}, nil
+}
+
+func (ids *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
+ glog.V(4).Infof("Using default capabilities")
+ return &csi.GetPluginCapabilitiesResponse{
+ Capabilities: []*csi.PluginCapability{
+ {
+ Type: &csi.PluginCapability_Service_{
+ Service: &csi.PluginCapability_Service{
+ Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
+ },
+ },
+ },
+ /* // TODO add later
+ {
+ Type: &csi.PluginCapability_VolumeExpansion_{
+ VolumeExpansion: &csi.PluginCapability_VolumeExpansion{
+ Type: csi.PluginCapability_VolumeExpansion_ONLINE,
+ },
+ },
+ },
+ */
+ },
+ }, nil
+}
diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go
new file mode 100644
index 0000000..abef4a4
--- /dev/null
+++ b/pkg/driver/nodeserver.go
@@ -0,0 +1,105 @@
+package driver
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "strings"
+
+ "github.com/golang/glog"
+
+ "github.com/container-storage-interface/spec/lib/go/csi"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "k8s.io/utils/mount"
+)
+
+type NodeServer struct {
+ Driver *SeaweedFsDriver
+ mounter mount.Interface
+}
+
+var _ = csi.NodeServer(&NodeServer{})
+
+func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
+ // mount the fs here
+ targetPath := req.GetTargetPath()
+
+ mo := req.GetVolumeCapability().GetMount().GetMountFlags()
+ if req.GetReadonly() {
+ mo = append(mo, "ro")
+ }
+
+ source := fmt.Sprintf("%s%s", ns.Driver.filer, ns.Driver.pathOnFiler)
+
+ err := ns.Driver.mount(source, targetPath)
+
+ if err != nil {
+ if os.IsPermission(err) {
+ return nil, status.Error(codes.PermissionDenied, err.Error())
+ }
+ if strings.Contains(err.Error(), "invalid argument") {
+ return nil, status.Error(codes.InvalidArgument, err.Error())
+ }
+ return nil, status.Error(codes.Internal, err.Error())
+ }
+
+ return &csi.NodePublishVolumeResponse{}, nil
+}
+
+func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
+ targetPath := req.GetTargetPath()
+
+ err := ns.Driver.unmount(targetPath)
+
+ if err != nil {
+ return nil, status.Error(codes.Internal, err.Error())
+ }
+
+ err = os.Remove(targetPath)
+ if err != nil && !os.IsNotExist(err) {
+ return nil, status.Error(codes.Internal, err.Error())
+ }
+
+ return &csi.NodeUnpublishVolumeResponse{}, nil
+}
+
+func (ns *NodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
+ glog.V(5).Infof("Using default NodeGetInfo")
+
+ return &csi.NodeGetInfoResponse{
+ NodeId: ns.Driver.nodeID,
+ }, nil
+}
+
+func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
+ glog.V(5).Infof("Using default NodeGetCapabilities")
+
+ return &csi.NodeGetCapabilitiesResponse{
+ Capabilities: []*csi.NodeServiceCapability{
+ {
+ Type: &csi.NodeServiceCapability_Rpc{
+ Rpc: &csi.NodeServiceCapability_RPC{
+ Type: csi.NodeServiceCapability_RPC_UNKNOWN,
+ },
+ },
+ },
+ },
+ }, nil
+}
+
+func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
+ return nil, status.Error(codes.Unimplemented, "")
+}
+
+func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
+ return &csi.NodeUnstageVolumeResponse{}, nil
+}
+
+func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
+ return &csi.NodeStageVolumeResponse{}, nil
+}
+
+func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
+ return nil, status.Error(codes.Unimplemented, "")
+}
diff --git a/pkg/driver/server.go b/pkg/driver/server.go
new file mode 100644
index 0000000..64a9333
--- /dev/null
+++ b/pkg/driver/server.go
@@ -0,0 +1,96 @@
+package driver
+
+import (
+ "net"
+ "os"
+ "sync"
+
+ "github.com/golang/glog"
+ "google.golang.org/grpc"
+
+ "github.com/container-storage-interface/spec/lib/go/csi"
+)
+
+// Defines Non blocking GRPC server interfaces
+type NonBlockingGRPCServer interface {
+ // Start services at the endpoint
+ Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer)
+ // Waits for the service to stop
+ Wait()
+ // Stops the service gracefully
+ Stop()
+ // Stops the service forcefully
+ ForceStop()
+}
+
+func NewNonBlockingGRPCServer() NonBlockingGRPCServer {
+ return &nonBlockingGRPCServer{}
+}
+
+// NonBlocking server
+type nonBlockingGRPCServer struct {
+ wg sync.WaitGroup
+ server *grpc.Server
+}
+
+func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
+
+ s.wg.Add(1)
+
+ go s.serve(endpoint, ids, cs, ns)
+
+ return
+}
+
+func (s *nonBlockingGRPCServer) Wait() {
+ s.wg.Wait()
+}
+
+func (s *nonBlockingGRPCServer) Stop() {
+ s.server.GracefulStop()
+}
+
+func (s *nonBlockingGRPCServer) ForceStop() {
+ s.server.Stop()
+}
+
+func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
+
+ proto, addr, err := ParseEndpoint(endpoint)
+ if err != nil {
+ glog.Fatal(err.Error())
+ }
+
+ if proto == "unix" {
+ addr = "/" + addr
+ if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
+ glog.Fatalf("Failed to remove %s, error: %s", addr, err.Error())
+ }
+ }
+
+ listener, err := net.Listen(proto, addr)
+ if err != nil {
+ glog.Fatalf("Failed to listen: %v", err)
+ }
+
+ opts := []grpc.ServerOption{
+ grpc.UnaryInterceptor(logGRPC),
+ }
+ server := grpc.NewServer(opts...)
+ s.server = server
+
+ if ids != nil {
+ csi.RegisterIdentityServer(server, ids)
+ }
+ if cs != nil {
+ csi.RegisterControllerServer(server, cs)
+ }
+ if ns != nil {
+ csi.RegisterNodeServer(server, ns)
+ }
+
+ glog.Infof("Listening for connections on address: %#v", listener.Addr())
+
+ server.Serve(listener)
+
+} \ No newline at end of file
diff --git a/pkg/driver/utils.go b/pkg/driver/utils.go
new file mode 100644
index 0000000..2334e27
--- /dev/null
+++ b/pkg/driver/utils.go
@@ -0,0 +1,52 @@
+package driver
+
+import (
+ "fmt"
+ "strings"
+
+ "github.com/container-storage-interface/spec/lib/go/csi"
+ "github.com/golang/glog"
+ "golang.org/x/net/context"
+ "google.golang.org/grpc"
+)
+
+func NewIdentityServer(d *SeaweedFsDriver) *IdentityServer {
+ return &IdentityServer{
+ Driver: d,
+ }
+}
+
+func NewControllerServer(d *SeaweedFsDriver) *ControllerServer {
+ return &ControllerServer{
+ Driver: d,
+ }
+}
+
+func NewControllerServiceCapability(cap csi.ControllerServiceCapability_RPC_Type) *csi.ControllerServiceCapability {
+ return &csi.ControllerServiceCapability{
+ Type: &csi.ControllerServiceCapability_Rpc{
+ Rpc: &csi.ControllerServiceCapability_RPC{
+ Type: cap,
+ },
+ },
+ }
+}
+
+func ParseEndpoint(ep string) (string, string, error) {
+ if strings.HasPrefix(strings.ToLower(ep), "unix://") || strings.HasPrefix(strings.ToLower(ep), "tcp://") {
+ s := strings.SplitN(ep, "://", 2)
+ if s[1] != "" {
+ return s[0], s[1], nil
+ }
+ }
+ return "", "", fmt.Errorf("Invalid endpoint: %v", ep)
+}
+
+func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+ glog.V(3).Infof("GRPC call: %s", info.FullMethod)
+ resp, err := handler(ctx, req)
+ if err != nil {
+ glog.Errorf("GRPC error: %v", err)
+ }
+ return resp, err
+}
diff --git a/pkg/driver/version.go b/pkg/driver/version.go
new file mode 100644
index 0000000..258267a
--- /dev/null
+++ b/pkg/driver/version.go
@@ -0,0 +1,53 @@
+/*
+Copyright 2019 The Kubernetes Authors.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package driver
+
+import (
+ "encoding/json"
+ "fmt"
+ "runtime"
+)
+
+var (
+ driverVersion string
+ gitCommit string
+ buildDate string
+)
+
+type VersionInfo struct {
+ DriverVersion string `json:"driverVersion"`
+ GitCommit string `json:"gitCommit"`
+ BuildDate string `json:"buildDate"`
+ GoVersion string `json:"goVersion"`
+ Compiler string `json:"compiler"`
+ Platform string `json:"platform"`
+}
+
+func GetVersion() VersionInfo {
+ return VersionInfo{
+ DriverVersion: driverVersion,
+ GitCommit: gitCommit,
+ BuildDate: buildDate,
+ GoVersion: runtime.Version(),
+ Compiler: runtime.Compiler,
+ Platform: fmt.Sprintf("%s/%s", runtime.GOOS, runtime.GOARCH),
+ }
+}
+func GetVersionJSON() (string, error) {
+ info := GetVersion()
+ marshalled, err := json.MarshalIndent(&info, "", " ")
+ if err != nil {
+ return "", err
+ }
+ return string(marshalled), nil
+}