diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-05-31 02:24:20 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-05-31 02:24:20 -0700 |
| commit | 5e19cfc577d447604d57f5a8c770ae4acaccfbcd (patch) | |
| tree | 346309f201de066a11c1fac03e6a89f380bc3deb /pkg | |
| parent | b21fb2e2b62dd9ec3cf13403e87687229424f1e0 (diff) | |
| download | seaweedfs-csi-driver-5e19cfc577d447604d57f5a8c770ae4acaccfbcd.tar.xz seaweedfs-csi-driver-5e19cfc577d447604d57f5a8c770ae4acaccfbcd.zip | |
it can compile now!
Diffstat (limited to 'pkg')
| -rw-r--r-- | pkg/driver/controllerserver.go | 89 | ||||
| -rw-r--r-- | pkg/driver/driver.go | 42 | ||||
| -rw-r--r-- | pkg/driver/mounter.go | 87 | ||||
| -rw-r--r-- | pkg/driver/mounter_seaweedfs.go | 33 | ||||
| -rw-r--r-- | pkg/driver/nodeserver.go | 75 | ||||
| -rw-r--r-- | pkg/driver/util.go | 87 | ||||
| -rw-r--r-- | pkg/driver/utils.go | 7 |
7 files changed, 388 insertions, 32 deletions
diff --git a/pkg/driver/controllerserver.go b/pkg/driver/controllerserver.go index e176866..c0eeed3 100644 --- a/pkg/driver/controllerserver.go +++ b/pkg/driver/controllerserver.go @@ -2,7 +2,13 @@ package driver import ( "context" + "crypto/sha1" + "encoding/hex" + "fmt" + "io" + "strings" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/golang/glog" "google.golang.org/grpc/codes" @@ -17,7 +23,21 @@ var _ = csi.ControllerServer(&ControllerServer{}) func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { - volumeId := req.GetName() + volumeId := sanitizeVolumeId(req.GetName()) + + if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { + glog.V(3).Infof("invalid create volume req: %v", req) + return nil, err + } + + // Check arguments + if volumeId == "" { + return nil, status.Error(codes.InvalidArgument, "Name missing in request") + } + if req.GetVolumeCapabilities() == nil { + return nil, status.Error(codes.InvalidArgument, "Volume Capabilities missing in request") + } + params := req.GetParameters() glog.V(4).Info("params:%v", params) capacity := req.GetCapacityRange().GetRequiredBytes() @@ -30,7 +50,12 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol seaweedFsVolumeCount = 1 } - err := cs.Driver.createBucket(volumeId, int(seaweedFsVolumeCount)) + cfg := newConfigFromSecrets(req.GetSecrets()) + if err := filer_pb.Mkdir(cfg, "/buckets", volumeId, nil); err != nil { + return nil, fmt.Errorf("Error setting bucket metadata: %v", err) + } + + glog.V(4).Infof("create volume %s", volumeId) return &csi.CreateVolumeResponse{ Volume: &csi.Volume{ @@ -38,15 +63,30 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol CapacityBytes: 0, // seaweedFsVolumeCount * 1024 * 1024 * 30, VolumeContext: params, }, - }, err + }, nil } func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { volumeId := req.VolumeId - err := cs.Driver.deleteBucket(volumeId) - return &csi.DeleteVolumeResponse{}, err + // Check arguments + if volumeId == "" { + return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + } + + if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { + glog.V(3).Infof("Invalid delete volume req: %v", req) + return nil, err + } + glog.V(4).Infof("Deleting volume %s", volumeId) + + cfg := newConfigFromSecrets(req.GetSecrets()) + if err := filer_pb.Remove(cfg, "/buckets", volumeId, true, true, true); err != nil { + return nil, fmt.Errorf("Error setting bucket metadata: %v", err) + } + + return &csi.DeleteVolumeResponse{}, nil } func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { @@ -59,6 +99,35 @@ func (cs *ControllerServer) ControllerUnpublishVolume(ctx context.Context, req * func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { + // Check arguments + if req.GetVolumeId() == "" { + return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + } + if req.GetVolumeCapabilities() == nil { + return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request") + } + + cfg := newConfigFromSecrets(req.GetSecrets()) + exists, err := filer_pb.Exists(cfg, "/buckets", req.GetVolumeId(), true) + if err != nil { + return nil, fmt.Errorf("Error checking bucket %s exists: %v", req.GetVolumeId(), err) + } + if !exists { + // return an error if the volume requested does not exist + return nil, status.Error(codes.NotFound, fmt.Sprintf("Volume with id %s does not exist", req.GetVolumeId())) + } + + // We currently only support RWO + supportedAccessMode := &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + } + + for _, cap := range req.VolumeCapabilities { + if cap.GetAccessMode().GetMode() != supportedAccessMode.GetMode() { + return &csi.ValidateVolumeCapabilitiesResponse{Message: "Only single node writer is supported"}, nil + } + } + return &csi.ValidateVolumeCapabilitiesResponse{ Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{ VolumeContext: req.GetVolumeContext(), @@ -102,3 +171,13 @@ func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnap func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { return nil, status.Error(codes.Unimplemented, "") } + +func sanitizeVolumeId(volumeId string) string { + volumeId = strings.ToLower(volumeId) + if len(volumeId) > 63 { + h := sha1.New() + io.WriteString(h, volumeId) + volumeId = hex.EncodeToString(h.Sum(nil)) + } + return volumeId +}
\ No newline at end of file diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index d090613..df46223 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -1,8 +1,15 @@ package driver import ( + "fmt" + "os" + "github.com/container-storage-interface/spec/lib/go/csi" "github.com/golang/glog" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "k8s.io/klog" + "k8s.io/client-go/rest" ) const ( @@ -23,8 +30,6 @@ type SeaweedFsDriver struct { vcap []*csi.VolumeCapability_AccessMode cscap []*csi.ControllerServiceCapability - filer string - pathOnFiler string } func NewSeaweedFsDriver(nodeID, endpoint string) *SeaweedFsDriver { @@ -48,11 +53,13 @@ func NewSeaweedFsDriver(nodeID, endpoint string) *SeaweedFsDriver { return n } -func NewNodeServer(n *SeaweedFsDriver) *NodeServer { - - return &NodeServer{ - Driver: n, +func (n *SeaweedFsDriver) initClient() error { + _, err := rest.InClusterConfig() + if err != nil { + klog.Errorf("Failed to get cluster config with error: %v\n", err) + os.Exit(1) } + return nil } func (n *SeaweedFsDriver) Run() { @@ -87,16 +94,15 @@ func (n *SeaweedFsDriver) AddControllerServiceCapabilities(cl []csi.ControllerSe return } -func (n *SeaweedFsDriver) createBucket(volumeId string, seaweedFsVolumeCount int) error { - // TODO implement seaweedFsVolumeCount later - return nil -} -func (n *SeaweedFsDriver) deleteBucket(volumeId string) error { - return nil -} -func (n *SeaweedFsDriver) mount(source string, targetPath string) error { - return nil -} -func (n *SeaweedFsDriver) unmount(targetPath string) error { - return nil +func (d *SeaweedFsDriver) ValidateControllerServiceRequest(c csi.ControllerServiceCapability_RPC_Type) error { + if c == csi.ControllerServiceCapability_RPC_UNKNOWN { + return nil + } + + for _, cap := range d.cscap { + if c == cap.GetRpc().GetType() { + return nil + } + } + return status.Error(codes.InvalidArgument, fmt.Sprintf("%s", c)) } diff --git a/pkg/driver/mounter.go b/pkg/driver/mounter.go new file mode 100644 index 0000000..e94544d --- /dev/null +++ b/pkg/driver/mounter.go @@ -0,0 +1,87 @@ +package driver + +import ( + "fmt" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/glog" + "google.golang.org/grpc" + "os/exec" + "k8s.io/utils/mount" +) + +// Config holds values to configure the driver +type Config struct { + // Region string + Filer string +} + +type Mounter interface { + Mount(target string) error +} + +func newMounter(bucketName string, cfg *Config) (Mounter, error) { + return newSeaweedFsMounter(bucketName, cfg) +} + +func fuseMount(path string, command string, args []string) error { + cmd := exec.Command(command, args...) + glog.V(3).Infof("Mounting fuse with command: %s and args: %s", command, args) + + out, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("Error fuseMount command: %s\nargs: %s\noutput: %s", command, args, out) + } + + return waitForMount(path, 10*time.Second) +} + +func fuseUnmount(path string) error { + if err := mount.New("").Unmount(path); err != nil { + return err + } + // as fuse quits immediately, we will try to wait until the process is done + process, err := findFuseMountProcess(path) + if err != nil { + glog.Errorf("Error getting PID of fuse mount: %s", err) + return nil + } + if process == nil { + glog.Warningf("Unable to find PID of fuse mount %s, it must have finished already", path) + return nil + } + glog.Infof("Found fuse pid %v of mount %s, checking if it still runs", process.Pid, path) + return waitForProcess(process, 1) +} + +func newConfigFromSecrets(secrets map[string]string) *Config { + t := &Config{ + Filer: secrets["filer"], + } + return t +} + +var _ = filer_pb.FilerClient(&Config{}) + +func (cfg *Config) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + + filerGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(cfg.Filer) + if parseErr != nil { + return fmt.Errorf("failed to parse filer %v: %v", filerGrpcAddress, parseErr) + } + + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, filerGrpcAddress, grpcDialOption) + +} +func (cfg *Config) AdjustedUrl(hostAndPort string) string { + return hostAndPort +} diff --git a/pkg/driver/mounter_seaweedfs.go b/pkg/driver/mounter_seaweedfs.go new file mode 100644 index 0000000..12fa1e7 --- /dev/null +++ b/pkg/driver/mounter_seaweedfs.go @@ -0,0 +1,33 @@ +package driver + +import ( + "fmt" +) + +// Implements Mounter +type seaweedFsMounter struct { + bucketName string + filerUrl string +} + +const ( + seaweedFsCmd = "weed" +) + +func newSeaweedFsMounter(bucketName string, cfg *Config) (Mounter, error) { + return &seaweedFsMounter{ + bucketName: bucketName, + filerUrl: cfg.Filer, + }, nil +} + +func (seaweedFs *seaweedFsMounter) Mount(target string) error { + args := []string{ + "mount", + fmt.Sprintf("-dir=%s", target), + fmt.Sprintf("-collection=%s", seaweedFs.bucketName), + fmt.Sprintf("-filer=%s", seaweedFs.filerUrl), + fmt.Sprintf("-filer.path=/buckets/%s", seaweedFs.bucketName), + } + return fuseMount(target, seaweedFsCmd, args) +} diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go index abef4a4..faf35ba 100644 --- a/pkg/driver/nodeserver.go +++ b/pkg/driver/nodeserver.go @@ -2,7 +2,6 @@ package driver import ( "context" - "fmt" "os" "strings" @@ -22,19 +21,41 @@ type NodeServer struct { var _ = csi.NodeServer(&NodeServer{}) func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { + volumeID := req.GetVolumeId() // mount the fs here targetPath := req.GetTargetPath() + // Check arguments + if req.GetVolumeCapability() == nil { + return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request") + } + if volumeID == "" { + return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + } + if targetPath == "" { + return nil, status.Error(codes.InvalidArgument, "Target path missing in request") + } + + // check whether it can be mounted + notMnt, err := checkMount(targetPath) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + if !notMnt { + return &csi.NodePublishVolumeResponse{}, nil + } + mo := req.GetVolumeCapability().GetMount().GetMountFlags() if req.GetReadonly() { mo = append(mo, "ro") } - source := fmt.Sprintf("%s%s", ns.Driver.filer, ns.Driver.pathOnFiler) - - err := ns.Driver.mount(source, targetPath) - + cfg := newConfigFromSecrets(req.GetSecrets()) + mounter, err := newMounter(volumeID, cfg) if err != nil { + return nil, err + } + if err := mounter.Mount(targetPath); err != nil { if os.IsPermission(err) { return nil, status.Error(codes.PermissionDenied, err.Error()) } @@ -44,19 +65,24 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis return nil, status.Error(codes.Internal, err.Error()) } + glog.V(4).Infof("volume %s successfully mounted to %s", volumeID, targetPath) + return &csi.NodePublishVolumeResponse{}, nil } func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { + targetPath := req.GetTargetPath() - err := ns.Driver.unmount(targetPath) + if targetPath == "" { + return nil, status.Error(codes.InvalidArgument, "Target path missing in request") + } - if err != nil { + if err := fuseUnmount(targetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) } - err = os.Remove(targetPath) + err := os.Remove(targetPath) if err != nil && !os.IsNotExist(err) { return nil, status.Error(codes.Internal, err.Error()) } @@ -80,6 +106,7 @@ func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetC { Type: &csi.NodeServiceCapability_Rpc{ Rpc: &csi.NodeServiceCapability_RPC{ + // Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, Type: csi.NodeServiceCapability_RPC_UNKNOWN, }, }, @@ -93,13 +120,43 @@ func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVol } func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { + // Check arguments + if req.GetVolumeId() == "" { + return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + } + if req.GetStagingTargetPath() == "" { + return nil, status.Error(codes.InvalidArgument, "Target path missing in request") + } + return &csi.NodeUnstageVolumeResponse{}, nil } func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { + // Check arguments + if req.GetVolumeId() == "" { + return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + } + if req.GetStagingTargetPath() == "" { + return nil, status.Error(codes.InvalidArgument, "Target path missing in request") + } return &csi.NodeStageVolumeResponse{}, nil } func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) { - return nil, status.Error(codes.Unimplemented, "") + return &csi.NodeExpandVolumeResponse{}, status.Error(codes.Unimplemented, "NodeExpandVolume is not implemented") } + +func checkMount(targetPath string) (bool, error) { + notMnt, err := mount.New("").IsLikelyNotMountPoint(targetPath) + if err != nil { + if os.IsNotExist(err) { + if err = os.MkdirAll(targetPath, 0750); err != nil { + return false, err + } + notMnt = true + } else { + return false, err + } + } + return notMnt, nil +}
\ No newline at end of file diff --git a/pkg/driver/util.go b/pkg/driver/util.go new file mode 100644 index 0000000..62a6c09 --- /dev/null +++ b/pkg/driver/util.go @@ -0,0 +1,87 @@ +package driver + +import ( + "errors" + "fmt" + "io/ioutil" + "os" + "strings" + "syscall" + "time" + + "github.com/mitchellh/go-ps" + "github.com/golang/glog" + "k8s.io/utils/mount" +) + +func waitForProcess(p *os.Process, backoff int) error { + if backoff == 20 { + return fmt.Errorf("Timeout waiting for PID %v to end", p.Pid) + } + cmdLine, err := getCmdLine(p.Pid) + if err != nil { + glog.Warningf("Error checking cmdline of PID %v, assuming it is dead: %s", p.Pid, err) + return nil + } + if cmdLine == "" { + // ignore defunct processes + // TODO: debug why this happens in the first place + // seems to only happen on k8s, not on local docker + glog.Warning("Fuse process seems dead, returning") + return nil + } + if err := p.Signal(syscall.Signal(0)); err != nil { + glog.Warningf("Fuse process does not seem active or we are unprivileged: %s", err) + return nil + } + glog.Infof("Fuse process with PID %v still active, waiting...", p.Pid) + time.Sleep(time.Duration(backoff*100) * time.Millisecond) + return waitForProcess(p, backoff+1) +} + +func waitForMount(path string, timeout time.Duration) error { + var elapsed time.Duration + var interval = 10 * time.Millisecond + for { + notMount, err := mount.New("").IsLikelyNotMountPoint(path) + if err != nil { + return err + } + if !notMount { + return nil + } + time.Sleep(interval) + elapsed = elapsed + interval + if elapsed >= timeout { + return errors.New("Timeout waiting for mount") + } + } +} + +func findFuseMountProcess(path string) (*os.Process, error) { + processes, err := ps.Processes() + if err != nil { + return nil, err + } + for _, p := range processes { + cmdLine, err := getCmdLine(p.Pid()) + if err != nil { + glog.Errorf("Unable to get cmdline of PID %v: %s", p.Pid(), err) + continue + } + if strings.Contains(cmdLine, path) { + glog.Infof("Found matching pid %v on path %s", p.Pid(), path) + return os.FindProcess(p.Pid()) + } + } + return nil, nil +} + +func getCmdLine(pid int) (string, error) { + cmdLineFile := fmt.Sprintf("/proc/%v/cmdline", pid) + cmdLine, err := ioutil.ReadFile(cmdLineFile) + if err != nil { + return "", err + } + return string(cmdLine), nil +} diff --git a/pkg/driver/utils.go b/pkg/driver/utils.go index 2334e27..b901182 100644 --- a/pkg/driver/utils.go +++ b/pkg/driver/utils.go @@ -10,6 +10,13 @@ import ( "google.golang.org/grpc" ) +func NewNodeServer(n *SeaweedFsDriver) *NodeServer { + + return &NodeServer{ + Driver: n, + } +} + func NewIdentityServer(d *SeaweedFsDriver) *IdentityServer { return &IdentityServer{ Driver: d, |
