aboutsummaryrefslogtreecommitdiff
path: root/pkg/driver
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/driver')
-rw-r--r--pkg/driver/controllerserver.go89
-rw-r--r--pkg/driver/driver.go42
-rw-r--r--pkg/driver/mounter.go87
-rw-r--r--pkg/driver/mounter_seaweedfs.go33
-rw-r--r--pkg/driver/nodeserver.go75
-rw-r--r--pkg/driver/util.go87
-rw-r--r--pkg/driver/utils.go7
7 files changed, 388 insertions, 32 deletions
diff --git a/pkg/driver/controllerserver.go b/pkg/driver/controllerserver.go
index e176866..c0eeed3 100644
--- a/pkg/driver/controllerserver.go
+++ b/pkg/driver/controllerserver.go
@@ -2,7 +2,13 @@ package driver
import (
"context"
+ "crypto/sha1"
+ "encoding/hex"
+ "fmt"
+ "io"
+ "strings"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
"google.golang.org/grpc/codes"
@@ -17,7 +23,21 @@ var _ = csi.ControllerServer(&ControllerServer{})
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
- volumeId := req.GetName()
+ volumeId := sanitizeVolumeId(req.GetName())
+
+ if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
+ glog.V(3).Infof("invalid create volume req: %v", req)
+ return nil, err
+ }
+
+ // Check arguments
+ if volumeId == "" {
+ return nil, status.Error(codes.InvalidArgument, "Name missing in request")
+ }
+ if req.GetVolumeCapabilities() == nil {
+ return nil, status.Error(codes.InvalidArgument, "Volume Capabilities missing in request")
+ }
+
params := req.GetParameters()
glog.V(4).Info("params:%v", params)
capacity := req.GetCapacityRange().GetRequiredBytes()
@@ -30,7 +50,12 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
seaweedFsVolumeCount = 1
}
- err := cs.Driver.createBucket(volumeId, int(seaweedFsVolumeCount))
+ cfg := newConfigFromSecrets(req.GetSecrets())
+ if err := filer_pb.Mkdir(cfg, "/buckets", volumeId, nil); err != nil {
+ return nil, fmt.Errorf("Error setting bucket metadata: %v", err)
+ }
+
+ glog.V(4).Infof("create volume %s", volumeId)
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
@@ -38,15 +63,30 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
CapacityBytes: 0, // seaweedFsVolumeCount * 1024 * 1024 * 30,
VolumeContext: params,
},
- }, err
+ }, nil
}
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
volumeId := req.VolumeId
- err := cs.Driver.deleteBucket(volumeId)
- return &csi.DeleteVolumeResponse{}, err
+ // Check arguments
+ if volumeId == "" {
+ return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
+ }
+
+ if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
+ glog.V(3).Infof("Invalid delete volume req: %v", req)
+ return nil, err
+ }
+ glog.V(4).Infof("Deleting volume %s", volumeId)
+
+ cfg := newConfigFromSecrets(req.GetSecrets())
+ if err := filer_pb.Remove(cfg, "/buckets", volumeId, true, true, true); err != nil {
+ return nil, fmt.Errorf("Error setting bucket metadata: %v", err)
+ }
+
+ return &csi.DeleteVolumeResponse{}, nil
}
func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
@@ -59,6 +99,35 @@ func (cs *ControllerServer) ControllerUnpublishVolume(ctx context.Context, req *
func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
+ // Check arguments
+ if req.GetVolumeId() == "" {
+ return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
+ }
+ if req.GetVolumeCapabilities() == nil {
+ return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request")
+ }
+
+ cfg := newConfigFromSecrets(req.GetSecrets())
+ exists, err := filer_pb.Exists(cfg, "/buckets", req.GetVolumeId(), true)
+ if err != nil {
+ return nil, fmt.Errorf("Error checking bucket %s exists: %v", req.GetVolumeId(), err)
+ }
+ if !exists {
+ // return an error if the volume requested does not exist
+ return nil, status.Error(codes.NotFound, fmt.Sprintf("Volume with id %s does not exist", req.GetVolumeId()))
+ }
+
+ // We currently only support RWO
+ supportedAccessMode := &csi.VolumeCapability_AccessMode{
+ Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
+ }
+
+ for _, cap := range req.VolumeCapabilities {
+ if cap.GetAccessMode().GetMode() != supportedAccessMode.GetMode() {
+ return &csi.ValidateVolumeCapabilitiesResponse{Message: "Only single node writer is supported"}, nil
+ }
+ }
+
return &csi.ValidateVolumeCapabilitiesResponse{
Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
VolumeContext: req.GetVolumeContext(),
@@ -102,3 +171,13 @@ func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnap
func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
+
+func sanitizeVolumeId(volumeId string) string {
+ volumeId = strings.ToLower(volumeId)
+ if len(volumeId) > 63 {
+ h := sha1.New()
+ io.WriteString(h, volumeId)
+ volumeId = hex.EncodeToString(h.Sum(nil))
+ }
+ return volumeId
+} \ No newline at end of file
diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go
index d090613..df46223 100644
--- a/pkg/driver/driver.go
+++ b/pkg/driver/driver.go
@@ -1,8 +1,15 @@
package driver
import (
+ "fmt"
+ "os"
+
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "k8s.io/klog"
+ "k8s.io/client-go/rest"
)
const (
@@ -23,8 +30,6 @@ type SeaweedFsDriver struct {
vcap []*csi.VolumeCapability_AccessMode
cscap []*csi.ControllerServiceCapability
- filer string
- pathOnFiler string
}
func NewSeaweedFsDriver(nodeID, endpoint string) *SeaweedFsDriver {
@@ -48,11 +53,13 @@ func NewSeaweedFsDriver(nodeID, endpoint string) *SeaweedFsDriver {
return n
}
-func NewNodeServer(n *SeaweedFsDriver) *NodeServer {
-
- return &NodeServer{
- Driver: n,
+func (n *SeaweedFsDriver) initClient() error {
+ _, err := rest.InClusterConfig()
+ if err != nil {
+ klog.Errorf("Failed to get cluster config with error: %v\n", err)
+ os.Exit(1)
}
+ return nil
}
func (n *SeaweedFsDriver) Run() {
@@ -87,16 +94,15 @@ func (n *SeaweedFsDriver) AddControllerServiceCapabilities(cl []csi.ControllerSe
return
}
-func (n *SeaweedFsDriver) createBucket(volumeId string, seaweedFsVolumeCount int) error {
- // TODO implement seaweedFsVolumeCount later
- return nil
-}
-func (n *SeaweedFsDriver) deleteBucket(volumeId string) error {
- return nil
-}
-func (n *SeaweedFsDriver) mount(source string, targetPath string) error {
- return nil
-}
-func (n *SeaweedFsDriver) unmount(targetPath string) error {
- return nil
+func (d *SeaweedFsDriver) ValidateControllerServiceRequest(c csi.ControllerServiceCapability_RPC_Type) error {
+ if c == csi.ControllerServiceCapability_RPC_UNKNOWN {
+ return nil
+ }
+
+ for _, cap := range d.cscap {
+ if c == cap.GetRpc().GetType() {
+ return nil
+ }
+ }
+ return status.Error(codes.InvalidArgument, fmt.Sprintf("%s", c))
}
diff --git a/pkg/driver/mounter.go b/pkg/driver/mounter.go
new file mode 100644
index 0000000..e94544d
--- /dev/null
+++ b/pkg/driver/mounter.go
@@ -0,0 +1,87 @@
+package driver
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/glog"
+ "google.golang.org/grpc"
+ "os/exec"
+ "k8s.io/utils/mount"
+)
+
+// Config holds values to configure the driver
+type Config struct {
+ // Region string
+ Filer string
+}
+
+type Mounter interface {
+ Mount(target string) error
+}
+
+func newMounter(bucketName string, cfg *Config) (Mounter, error) {
+ return newSeaweedFsMounter(bucketName, cfg)
+}
+
+func fuseMount(path string, command string, args []string) error {
+ cmd := exec.Command(command, args...)
+ glog.V(3).Infof("Mounting fuse with command: %s and args: %s", command, args)
+
+ out, err := cmd.CombinedOutput()
+ if err != nil {
+ return fmt.Errorf("Error fuseMount command: %s\nargs: %s\noutput: %s", command, args, out)
+ }
+
+ return waitForMount(path, 10*time.Second)
+}
+
+func fuseUnmount(path string) error {
+ if err := mount.New("").Unmount(path); err != nil {
+ return err
+ }
+ // as fuse quits immediately, we will try to wait until the process is done
+ process, err := findFuseMountProcess(path)
+ if err != nil {
+ glog.Errorf("Error getting PID of fuse mount: %s", err)
+ return nil
+ }
+ if process == nil {
+ glog.Warningf("Unable to find PID of fuse mount %s, it must have finished already", path)
+ return nil
+ }
+ glog.Infof("Found fuse pid %v of mount %s, checking if it still runs", process.Pid, path)
+ return waitForProcess(process, 1)
+}
+
+func newConfigFromSecrets(secrets map[string]string) *Config {
+ t := &Config{
+ Filer: secrets["filer"],
+ }
+ return t
+}
+
+var _ = filer_pb.FilerClient(&Config{})
+
+func (cfg *Config) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ filerGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(cfg.Filer)
+ if parseErr != nil {
+ return fmt.Errorf("failed to parse filer %v: %v", filerGrpcAddress, parseErr)
+ }
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, filerGrpcAddress, grpcDialOption)
+
+}
+func (cfg *Config) AdjustedUrl(hostAndPort string) string {
+ return hostAndPort
+}
diff --git a/pkg/driver/mounter_seaweedfs.go b/pkg/driver/mounter_seaweedfs.go
new file mode 100644
index 0000000..12fa1e7
--- /dev/null
+++ b/pkg/driver/mounter_seaweedfs.go
@@ -0,0 +1,33 @@
+package driver
+
+import (
+ "fmt"
+)
+
+// Implements Mounter
+type seaweedFsMounter struct {
+ bucketName string
+ filerUrl string
+}
+
+const (
+ seaweedFsCmd = "weed"
+)
+
+func newSeaweedFsMounter(bucketName string, cfg *Config) (Mounter, error) {
+ return &seaweedFsMounter{
+ bucketName: bucketName,
+ filerUrl: cfg.Filer,
+ }, nil
+}
+
+func (seaweedFs *seaweedFsMounter) Mount(target string) error {
+ args := []string{
+ "mount",
+ fmt.Sprintf("-dir=%s", target),
+ fmt.Sprintf("-collection=%s", seaweedFs.bucketName),
+ fmt.Sprintf("-filer=%s", seaweedFs.filerUrl),
+ fmt.Sprintf("-filer.path=/buckets/%s", seaweedFs.bucketName),
+ }
+ return fuseMount(target, seaweedFsCmd, args)
+}
diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go
index abef4a4..faf35ba 100644
--- a/pkg/driver/nodeserver.go
+++ b/pkg/driver/nodeserver.go
@@ -2,7 +2,6 @@ package driver
import (
"context"
- "fmt"
"os"
"strings"
@@ -22,19 +21,41 @@ type NodeServer struct {
var _ = csi.NodeServer(&NodeServer{})
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
+ volumeID := req.GetVolumeId()
// mount the fs here
targetPath := req.GetTargetPath()
+ // Check arguments
+ if req.GetVolumeCapability() == nil {
+ return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request")
+ }
+ if volumeID == "" {
+ return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
+ }
+ if targetPath == "" {
+ return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
+ }
+
+ // check whether it can be mounted
+ notMnt, err := checkMount(targetPath)
+ if err != nil {
+ return nil, status.Error(codes.Internal, err.Error())
+ }
+ if !notMnt {
+ return &csi.NodePublishVolumeResponse{}, nil
+ }
+
mo := req.GetVolumeCapability().GetMount().GetMountFlags()
if req.GetReadonly() {
mo = append(mo, "ro")
}
- source := fmt.Sprintf("%s%s", ns.Driver.filer, ns.Driver.pathOnFiler)
-
- err := ns.Driver.mount(source, targetPath)
-
+ cfg := newConfigFromSecrets(req.GetSecrets())
+ mounter, err := newMounter(volumeID, cfg)
if err != nil {
+ return nil, err
+ }
+ if err := mounter.Mount(targetPath); err != nil {
if os.IsPermission(err) {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
@@ -44,19 +65,24 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return nil, status.Error(codes.Internal, err.Error())
}
+ glog.V(4).Infof("volume %s successfully mounted to %s", volumeID, targetPath)
+
return &csi.NodePublishVolumeResponse{}, nil
}
func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
+
targetPath := req.GetTargetPath()
- err := ns.Driver.unmount(targetPath)
+ if targetPath == "" {
+ return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
+ }
- if err != nil {
+ if err := fuseUnmount(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
- err = os.Remove(targetPath)
+ err := os.Remove(targetPath)
if err != nil && !os.IsNotExist(err) {
return nil, status.Error(codes.Internal, err.Error())
}
@@ -80,6 +106,7 @@ func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetC
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
+ // Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
Type: csi.NodeServiceCapability_RPC_UNKNOWN,
},
},
@@ -93,13 +120,43 @@ func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVol
}
func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
+ // Check arguments
+ if req.GetVolumeId() == "" {
+ return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
+ }
+ if req.GetStagingTargetPath() == "" {
+ return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
+ }
+
return &csi.NodeUnstageVolumeResponse{}, nil
}
func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
+ // Check arguments
+ if req.GetVolumeId() == "" {
+ return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
+ }
+ if req.GetStagingTargetPath() == "" {
+ return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
+ }
return &csi.NodeStageVolumeResponse{}, nil
}
func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
- return nil, status.Error(codes.Unimplemented, "")
+ return &csi.NodeExpandVolumeResponse{}, status.Error(codes.Unimplemented, "NodeExpandVolume is not implemented")
}
+
+func checkMount(targetPath string) (bool, error) {
+ notMnt, err := mount.New("").IsLikelyNotMountPoint(targetPath)
+ if err != nil {
+ if os.IsNotExist(err) {
+ if err = os.MkdirAll(targetPath, 0750); err != nil {
+ return false, err
+ }
+ notMnt = true
+ } else {
+ return false, err
+ }
+ }
+ return notMnt, nil
+} \ No newline at end of file
diff --git a/pkg/driver/util.go b/pkg/driver/util.go
new file mode 100644
index 0000000..62a6c09
--- /dev/null
+++ b/pkg/driver/util.go
@@ -0,0 +1,87 @@
+package driver
+
+import (
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "strings"
+ "syscall"
+ "time"
+
+ "github.com/mitchellh/go-ps"
+ "github.com/golang/glog"
+ "k8s.io/utils/mount"
+)
+
+func waitForProcess(p *os.Process, backoff int) error {
+ if backoff == 20 {
+ return fmt.Errorf("Timeout waiting for PID %v to end", p.Pid)
+ }
+ cmdLine, err := getCmdLine(p.Pid)
+ if err != nil {
+ glog.Warningf("Error checking cmdline of PID %v, assuming it is dead: %s", p.Pid, err)
+ return nil
+ }
+ if cmdLine == "" {
+ // ignore defunct processes
+ // TODO: debug why this happens in the first place
+ // seems to only happen on k8s, not on local docker
+ glog.Warning("Fuse process seems dead, returning")
+ return nil
+ }
+ if err := p.Signal(syscall.Signal(0)); err != nil {
+ glog.Warningf("Fuse process does not seem active or we are unprivileged: %s", err)
+ return nil
+ }
+ glog.Infof("Fuse process with PID %v still active, waiting...", p.Pid)
+ time.Sleep(time.Duration(backoff*100) * time.Millisecond)
+ return waitForProcess(p, backoff+1)
+}
+
+func waitForMount(path string, timeout time.Duration) error {
+ var elapsed time.Duration
+ var interval = 10 * time.Millisecond
+ for {
+ notMount, err := mount.New("").IsLikelyNotMountPoint(path)
+ if err != nil {
+ return err
+ }
+ if !notMount {
+ return nil
+ }
+ time.Sleep(interval)
+ elapsed = elapsed + interval
+ if elapsed >= timeout {
+ return errors.New("Timeout waiting for mount")
+ }
+ }
+}
+
+func findFuseMountProcess(path string) (*os.Process, error) {
+ processes, err := ps.Processes()
+ if err != nil {
+ return nil, err
+ }
+ for _, p := range processes {
+ cmdLine, err := getCmdLine(p.Pid())
+ if err != nil {
+ glog.Errorf("Unable to get cmdline of PID %v: %s", p.Pid(), err)
+ continue
+ }
+ if strings.Contains(cmdLine, path) {
+ glog.Infof("Found matching pid %v on path %s", p.Pid(), path)
+ return os.FindProcess(p.Pid())
+ }
+ }
+ return nil, nil
+}
+
+func getCmdLine(pid int) (string, error) {
+ cmdLineFile := fmt.Sprintf("/proc/%v/cmdline", pid)
+ cmdLine, err := ioutil.ReadFile(cmdLineFile)
+ if err != nil {
+ return "", err
+ }
+ return string(cmdLine), nil
+}
diff --git a/pkg/driver/utils.go b/pkg/driver/utils.go
index 2334e27..b901182 100644
--- a/pkg/driver/utils.go
+++ b/pkg/driver/utils.go
@@ -10,6 +10,13 @@ import (
"google.golang.org/grpc"
)
+func NewNodeServer(n *SeaweedFsDriver) *NodeServer {
+
+ return &NodeServer{
+ Driver: n,
+ }
+}
+
func NewIdentityServer(d *SeaweedFsDriver) *IdentityServer {
return &IdentityServer{
Driver: d,