diff options
Diffstat (limited to 'pkg/driver')
| -rw-r--r-- | pkg/driver/mounter_seaweedfs.go | 2 | ||||
| -rw-r--r-- | pkg/driver/nodeserver.go | 141 | ||||
| -rw-r--r-- | pkg/driver/volume.go | 68 |
3 files changed, 166 insertions, 45 deletions
diff --git a/pkg/driver/mounter_seaweedfs.go b/pkg/driver/mounter_seaweedfs.go index d3ffe8a..56ba98b 100644 --- a/pkg/driver/mounter_seaweedfs.go +++ b/pkg/driver/mounter_seaweedfs.go @@ -50,7 +50,7 @@ func (seaweedFs *seaweedFsMounter) Mount(target string) (Unmounter, error) { } // CacheDirForRead should be always defined - we use temp dir in case it is not defined - // we need to use predictable cache path, because we need to clean it up on unpublish + // we need to use predictable cache path, because we need to clean it up on unstage cacheDir := filepath.Join(seaweedFs.driver.CacheDir, seaweedFs.volumeID) // Final args diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go index c0572a8..616c79b 100644 --- a/pkg/driver/nodeserver.go +++ b/pkg/driver/nodeserver.go @@ -26,12 +26,12 @@ type NodeServer struct { var _ = csi.NodeServer(&NodeServer{}) -func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { +func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { volumeID := req.GetVolumeId() // mount the fs here - targetPath := req.GetTargetPath() + stagingTargetPath := req.GetStagingTargetPath() - glog.Infof("node target volume %s to %s", volumeID, targetPath) + glog.Infof("node stage volume %s to %s", volumeID, stagingTargetPath) // Check arguments if req.GetVolumeCapability() == nil { @@ -43,7 +43,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis if volumeID == "" { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } - if targetPath == "" { + if stagingTargetPath == "" { return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } @@ -51,10 +51,10 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis volumeMutex.Lock() defer volumeMutex.Unlock() - // The volume has been publish. + // The volume has been staged. if _, ok := ns.volumes.Load(volumeID); ok { - glog.Infof("volume %s has been already published", volumeID) - return &csi.NodePublishVolumeResponse{}, nil + glog.Infof("volume %s has been already staged", volumeID) + return &csi.NodeStageVolumeResponse{}, nil } volContext := req.GetVolumeContext() @@ -62,14 +62,14 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis mounter, err := newMounter(volumeID, readOnly, ns.Driver, volContext) if err != nil { - // node publish is unsuccessfull + // node stage is unsuccessfull ns.removeVolumeMutex(volumeID) return nil, err } volume := NewVolume(volumeID, mounter) - if err := volume.Publish(targetPath); err != nil { - // node publish is unsuccessfull + if err := volume.Stage(stagingTargetPath); err != nil { + // node stage is unsuccessfull ns.removeVolumeMutex(volumeID) if os.IsPermission(err) { @@ -90,21 +90,63 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis } ns.volumes.Store(volumeID, volume) - glog.Infof("volume %s successfully publish to %s", volumeID, targetPath) + glog.Infof("volume %s successfully staged to %s", volumeID, stagingTargetPath) + return &csi.NodeStageVolumeResponse{}, nil +} + +func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { + volumeID := req.GetVolumeId() + targetPath := req.GetTargetPath() + stagingTargetPath := req.GetStagingTargetPath() + + glog.Infof("node publish volume %s to %s", volumeID, targetPath) + + // Check arguments + if req.GetVolumeCapability() == nil { + return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request") + } + if !isValidVolumeCapabilities(ns.Driver.vcap, []*csi.VolumeCapability{req.GetVolumeCapability()}) { + // return nil, status.Error(codes.InvalidArgument, "Volume capability not supported") + } + if volumeID == "" { + return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + } + if targetPath == "" { + return nil, status.Error(codes.InvalidArgument, "Target path missing in request") + } + if stagingTargetPath == "" { + return nil, status.Error(codes.InvalidArgument, "Staging target path missing in request") + } + + volumeMutex := ns.getVolumeMutex(volumeID) + volumeMutex.Lock() + defer volumeMutex.Unlock() + + volume, ok := ns.volumes.Load(volumeID) + if !ok { + return nil, status.Error(codes.FailedPrecondition, "volume hasn't been staged yet") + } + + // When pod uses a volume in read-only mode, k8s will automatically + // mount the volume as a read-only file system. + if err := volume.(*Volume).Publish(stagingTargetPath, targetPath, req.GetReadonly()); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + glog.Infof("volume %s successfully published to %s", volumeID, targetPath) return &csi.NodePublishVolumeResponse{}, nil } func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { volumeID := req.GetVolumeId() targetPath := req.GetTargetPath() - glog.Infof("node unpublish volume %s from %s", volumeID, targetPath) - // Check arguments if volumeID == "" { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } + if targetPath == "" { return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } @@ -119,18 +161,15 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu // make sure there is no any garbage _ = mount.CleanupMountPoint(targetPath, mountutil, true) - } else { - if err := volume.(*Volume).Unpublish(targetPath); err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } else { - ns.volumes.Delete(volumeID) - } + + return &csi.NodeUnpublishVolumeResponse{}, nil } - // remove mutex on successfull unpublish - ns.volumeMutexes.RemoveMutex(volumeID) + if err := volume.(*Volume).Unpublish(targetPath); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } - glog.Infof("volume %s successfully unpublish from %s", volumeID, targetPath) + glog.Infof("volume %s successfully unpublished from %s", volumeID, targetPath) return &csi.NodeUnpublishVolumeResponse{}, nil } @@ -145,11 +184,19 @@ func (ns *NodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoReque func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { glog.V(3).Infof("node get capabilities") + return &csi.NodeGetCapabilitiesResponse{ Capabilities: []*csi.NodeServiceCapability{ { Type: &csi.NodeServiceCapability_Rpc{ Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, + }, + }, + }, + { + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ Type: csi.NodeServiceCapability_RPC_EXPAND_VOLUME, }, }, @@ -158,6 +205,46 @@ func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetC }, nil } +func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { + volumeID := req.GetVolumeId() + stagingTargetPath := req.GetStagingTargetPath() + + glog.Infof("node unstage volume %s from %s", volumeID, stagingTargetPath) + + // Check arguments + if volumeID == "" { + return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + } + if stagingTargetPath == "" { + return nil, status.Error(codes.InvalidArgument, "Target path missing in request") + } + + volumeMutex := ns.getVolumeMutex(volumeID) + volumeMutex.Lock() + defer volumeMutex.Unlock() + + volume, ok := ns.volumes.Load(volumeID) + if !ok { + glog.Warningf("volume %s hasn't been staged", volumeID) + + // make sure there is no any garbage + _ = mount.CleanupMountPoint(stagingTargetPath, mountutil, true) + } else { + if err := volume.(*Volume).Unstage(stagingTargetPath); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } else { + ns.volumes.Delete(volumeID) + } + } + + // remove mutex on successfull unstage + ns.volumeMutexes.RemoveMutex(volumeID) + + glog.Infof("volume %s successfully unstaged from %s", volumeID, stagingTargetPath) + + return &csi.NodeUnstageVolumeResponse{}, nil +} + func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) { volumeID := req.GetVolumeId() volumePath := req.GetVolumePath() @@ -192,11 +279,11 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV func (ns *NodeServer) NodeCleanup() { ns.volumes.Range(func(_, vol any) bool { v := vol.(*Volume) - if len(v.TargetPath) > 0 { - glog.Infof("cleaning up volume %s at %s", v.VolumeId, v.TargetPath) - err := v.Unpublish(v.TargetPath) + if len(v.StagedPath) > 0 { + glog.Infof("cleaning up volume %s at %s", v.VolumeId, v.StagedPath) + err := v.Unstage(v.StagedPath) if err != nil { - glog.Warningf("error cleaning up volume %s at %s, err: %v", v.VolumeId, v.TargetPath, err) + glog.Warningf("error cleaning up volume %s at %s, err: %v", v.VolumeId, v.StagedPath, err) } } return true @@ -211,7 +298,7 @@ func (ns *NodeServer) removeVolumeMutex(volumeID string) { ns.volumeMutexes.RemoveMutex(volumeID) } -func isVolumeReadOnly(req *csi.NodePublishVolumeRequest) bool { +func isVolumeReadOnly(req *csi.NodeStageVolumeRequest) bool { mode := req.GetVolumeCapability().GetAccessMode().Mode readOnlyModes := []csi.VolumeCapability_AccessMode_Mode{ diff --git a/pkg/driver/volume.go b/pkg/driver/volume.go index 38d1308..ac0a80a 100644 --- a/pkg/driver/volume.go +++ b/pkg/driver/volume.go @@ -9,11 +9,12 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "k8s.io/mount-utils" ) type Volume struct { VolumeId string - TargetPath string + StagedPath string mounter Mounter unmounter Unmounter @@ -30,31 +31,55 @@ func NewVolume(volumeID string, mounter Mounter) *Volume { } } -func (vol *Volume) Publish(targetPath string) error { +func (vol *Volume) Stage(stagingTargetPath string) error { // check whether it can be mounted - if isMnt, err := checkMount(targetPath); err != nil { + if isMnt, err := checkMount(stagingTargetPath); err != nil { return err } else if isMnt { // try to unmount before mounting again - _ = mountutil.Unmount(targetPath) + _ = mountutil.Unmount(stagingTargetPath) } - if u, err := vol.mounter.Mount(targetPath); err == nil { - if vol.TargetPath != "" { - if vol.TargetPath == targetPath { - glog.Warningf("target path is already set to %s for volume %s", vol.TargetPath, vol.VolumeId) + if u, err := vol.mounter.Mount(stagingTargetPath); err == nil { + if vol.StagedPath != "" { + if vol.StagedPath == stagingTargetPath { + glog.Warningf("staged path is already set to %s for volume %s", vol.StagedPath, vol.VolumeId) } else { - glog.Warningf("target path is already set to %s and differs from %s for volume %s", vol.TargetPath, targetPath, vol.VolumeId) + glog.Warningf("staged path is already set to %s and differs from %s for volume %s", vol.StagedPath, stagingTargetPath, vol.VolumeId) } } - vol.TargetPath = targetPath + + vol.StagedPath = stagingTargetPath vol.unmounter = u + return nil } else { return err } } +func (vol *Volume) Publish(stagingTargetPath string, targetPath string, readOnly bool) error { + // check whether it can be mounted + if isMnt, err := checkMount(targetPath); err != nil { + return err + } else if isMnt { + // maybe already mounted? + return nil + } + + // Use bind mount to create an alias of the real mount point. + mountOptions := []string{"bind"} + if readOnly { + mountOptions = append(mountOptions, "ro") + } + + if err := mountutil.Mount(stagingTargetPath, targetPath, "", mountOptions); err != nil { + return err + } + + return nil +} + func (vol *Volume) Quota(sizeByte int64) error { target := fmt.Sprintf("passthrough:///unix://%s", vol.localSocket) dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) @@ -73,24 +98,33 @@ func (vol *Volume) Quota(sizeByte int64) error { } func (vol *Volume) Unpublish(targetPath string) error { - glog.V(0).Infof("unmounting volume %s from %s", vol.VolumeId, targetPath) + // Try unmounting target path and deleting it. + if err := mount.CleanupMountPoint(targetPath, mountutil, true); err != nil { + return err + } + + return nil +} + +func (vol *Volume) Unstage(stagingTargetPath string) error { + glog.V(0).Infof("unmounting volume %s from %s", vol.VolumeId, stagingTargetPath) if vol.unmounter == nil { - glog.Errorf("volume is not mounted: %s, path: %s", vol.VolumeId, targetPath) + glog.Errorf("volume is not mounted: %s, path: %s", vol.VolumeId, stagingTargetPath) return nil } - if targetPath != vol.TargetPath { - glog.Warningf("staging path %s differs for volume %s at %s", targetPath, vol.VolumeId, vol.TargetPath) + if stagingTargetPath != vol.StagedPath { + glog.Warningf("staging path %s differs for volume %s at %s", stagingTargetPath, vol.VolumeId, vol.StagedPath) } if err := vol.unmounter.Unmount(); err != nil { - glog.Errorf("error unmounting volume during unstage: %s, err: %v", targetPath, err) + glog.Errorf("error unmounting volume during unstage: %s, err: %v", stagingTargetPath, err) return err } - if err := os.Remove(targetPath); err != nil && !os.IsNotExist(err) { - glog.Errorf("error removing staging path for volume %s at %s, err: %v", vol.VolumeId, targetPath, err) + if err := os.Remove(stagingTargetPath); err != nil && !os.IsNotExist(err) { + glog.Errorf("error removing staging path for volume %s at %s, err: %v", vol.VolumeId, stagingTargetPath, err) return err } |
