diff options
Diffstat (limited to 'pkg/driver')
| -rw-r--r-- | pkg/driver/mounter_seaweedfs.go | 2 | ||||
| -rw-r--r-- | pkg/driver/nodeserver.go | 141 | ||||
| -rw-r--r-- | pkg/driver/volume.go | 68 |
3 files changed, 45 insertions, 166 deletions
diff --git a/pkg/driver/mounter_seaweedfs.go b/pkg/driver/mounter_seaweedfs.go index 56ba98b..d3ffe8a 100644 --- a/pkg/driver/mounter_seaweedfs.go +++ b/pkg/driver/mounter_seaweedfs.go @@ -50,7 +50,7 @@ func (seaweedFs *seaweedFsMounter) Mount(target string) (Unmounter, error) { } // CacheDirForRead should be always defined - we use temp dir in case it is not defined - // we need to use predictable cache path, because we need to clean it up on unstage + // we need to use predictable cache path, because we need to clean it up on unpublish cacheDir := filepath.Join(seaweedFs.driver.CacheDir, seaweedFs.volumeID) // Final args diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go index 616c79b..c0572a8 100644 --- a/pkg/driver/nodeserver.go +++ b/pkg/driver/nodeserver.go @@ -26,12 +26,12 @@ type NodeServer struct { var _ = csi.NodeServer(&NodeServer{}) -func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { +func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { volumeID := req.GetVolumeId() // mount the fs here - stagingTargetPath := req.GetStagingTargetPath() + targetPath := req.GetTargetPath() - glog.Infof("node stage volume %s to %s", volumeID, stagingTargetPath) + glog.Infof("node target volume %s to %s", volumeID, targetPath) // Check arguments if req.GetVolumeCapability() == nil { @@ -43,7 +43,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol if volumeID == "" { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } - if stagingTargetPath == "" { + if targetPath == "" { return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } @@ -51,10 +51,10 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol volumeMutex.Lock() defer volumeMutex.Unlock() - // The volume has been staged. + // The volume has been publish. if _, ok := ns.volumes.Load(volumeID); ok { - glog.Infof("volume %s has been already staged", volumeID) - return &csi.NodeStageVolumeResponse{}, nil + glog.Infof("volume %s has been already published", volumeID) + return &csi.NodePublishVolumeResponse{}, nil } volContext := req.GetVolumeContext() @@ -62,14 +62,14 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol mounter, err := newMounter(volumeID, readOnly, ns.Driver, volContext) if err != nil { - // node stage is unsuccessfull + // node publish is unsuccessfull ns.removeVolumeMutex(volumeID) return nil, err } volume := NewVolume(volumeID, mounter) - if err := volume.Stage(stagingTargetPath); err != nil { - // node stage is unsuccessfull + if err := volume.Publish(targetPath); err != nil { + // node publish is unsuccessfull ns.removeVolumeMutex(volumeID) if os.IsPermission(err) { @@ -90,63 +90,21 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol } ns.volumes.Store(volumeID, volume) - glog.Infof("volume %s successfully staged to %s", volumeID, stagingTargetPath) + glog.Infof("volume %s successfully publish to %s", volumeID, targetPath) - return &csi.NodeStageVolumeResponse{}, nil -} - -func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { - volumeID := req.GetVolumeId() - targetPath := req.GetTargetPath() - stagingTargetPath := req.GetStagingTargetPath() - - glog.Infof("node publish volume %s to %s", volumeID, targetPath) - - // Check arguments - if req.GetVolumeCapability() == nil { - return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request") - } - if !isValidVolumeCapabilities(ns.Driver.vcap, []*csi.VolumeCapability{req.GetVolumeCapability()}) { - // return nil, status.Error(codes.InvalidArgument, "Volume capability not supported") - } - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") - } - if targetPath == "" { - return nil, status.Error(codes.InvalidArgument, "Target path missing in request") - } - if stagingTargetPath == "" { - return nil, status.Error(codes.InvalidArgument, "Staging target path missing in request") - } - - volumeMutex := ns.getVolumeMutex(volumeID) - volumeMutex.Lock() - defer volumeMutex.Unlock() - - volume, ok := ns.volumes.Load(volumeID) - if !ok { - return nil, status.Error(codes.FailedPrecondition, "volume hasn't been staged yet") - } - - // When pod uses a volume in read-only mode, k8s will automatically - // mount the volume as a read-only file system. - if err := volume.(*Volume).Publish(stagingTargetPath, targetPath, req.GetReadonly()); err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - glog.Infof("volume %s successfully published to %s", volumeID, targetPath) return &csi.NodePublishVolumeResponse{}, nil } func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { volumeID := req.GetVolumeId() targetPath := req.GetTargetPath() + glog.Infof("node unpublish volume %s from %s", volumeID, targetPath) + // Check arguments if volumeID == "" { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } - if targetPath == "" { return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } @@ -161,15 +119,18 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu // make sure there is no any garbage _ = mount.CleanupMountPoint(targetPath, mountutil, true) - - return &csi.NodeUnpublishVolumeResponse{}, nil + } else { + if err := volume.(*Volume).Unpublish(targetPath); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } else { + ns.volumes.Delete(volumeID) + } } - if err := volume.(*Volume).Unpublish(targetPath); err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } + // remove mutex on successfull unpublish + ns.volumeMutexes.RemoveMutex(volumeID) - glog.Infof("volume %s successfully unpublished from %s", volumeID, targetPath) + glog.Infof("volume %s successfully unpublish from %s", volumeID, targetPath) return &csi.NodeUnpublishVolumeResponse{}, nil } @@ -184,19 +145,11 @@ func (ns *NodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoReque func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { glog.V(3).Infof("node get capabilities") - return &csi.NodeGetCapabilitiesResponse{ Capabilities: []*csi.NodeServiceCapability{ { Type: &csi.NodeServiceCapability_Rpc{ Rpc: &csi.NodeServiceCapability_RPC{ - Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, - }, - }, - }, - { - Type: &csi.NodeServiceCapability_Rpc{ - Rpc: &csi.NodeServiceCapability_RPC{ Type: csi.NodeServiceCapability_RPC_EXPAND_VOLUME, }, }, @@ -205,46 +158,6 @@ func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetC }, nil } -func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { - volumeID := req.GetVolumeId() - stagingTargetPath := req.GetStagingTargetPath() - - glog.Infof("node unstage volume %s from %s", volumeID, stagingTargetPath) - - // Check arguments - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") - } - if stagingTargetPath == "" { - return nil, status.Error(codes.InvalidArgument, "Target path missing in request") - } - - volumeMutex := ns.getVolumeMutex(volumeID) - volumeMutex.Lock() - defer volumeMutex.Unlock() - - volume, ok := ns.volumes.Load(volumeID) - if !ok { - glog.Warningf("volume %s hasn't been staged", volumeID) - - // make sure there is no any garbage - _ = mount.CleanupMountPoint(stagingTargetPath, mountutil, true) - } else { - if err := volume.(*Volume).Unstage(stagingTargetPath); err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } else { - ns.volumes.Delete(volumeID) - } - } - - // remove mutex on successfull unstage - ns.volumeMutexes.RemoveMutex(volumeID) - - glog.Infof("volume %s successfully unstaged from %s", volumeID, stagingTargetPath) - - return &csi.NodeUnstageVolumeResponse{}, nil -} - func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) { volumeID := req.GetVolumeId() volumePath := req.GetVolumePath() @@ -279,11 +192,11 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV func (ns *NodeServer) NodeCleanup() { ns.volumes.Range(func(_, vol any) bool { v := vol.(*Volume) - if len(v.StagedPath) > 0 { - glog.Infof("cleaning up volume %s at %s", v.VolumeId, v.StagedPath) - err := v.Unstage(v.StagedPath) + if len(v.TargetPath) > 0 { + glog.Infof("cleaning up volume %s at %s", v.VolumeId, v.TargetPath) + err := v.Unpublish(v.TargetPath) if err != nil { - glog.Warningf("error cleaning up volume %s at %s, err: %v", v.VolumeId, v.StagedPath, err) + glog.Warningf("error cleaning up volume %s at %s, err: %v", v.VolumeId, v.TargetPath, err) } } return true @@ -298,7 +211,7 @@ func (ns *NodeServer) removeVolumeMutex(volumeID string) { ns.volumeMutexes.RemoveMutex(volumeID) } -func isVolumeReadOnly(req *csi.NodeStageVolumeRequest) bool { +func isVolumeReadOnly(req *csi.NodePublishVolumeRequest) bool { mode := req.GetVolumeCapability().GetAccessMode().Mode readOnlyModes := []csi.VolumeCapability_AccessMode_Mode{ diff --git a/pkg/driver/volume.go b/pkg/driver/volume.go index ac0a80a..38d1308 100644 --- a/pkg/driver/volume.go +++ b/pkg/driver/volume.go @@ -9,12 +9,11 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "k8s.io/mount-utils" ) type Volume struct { VolumeId string - StagedPath string + TargetPath string mounter Mounter unmounter Unmounter @@ -31,55 +30,31 @@ func NewVolume(volumeID string, mounter Mounter) *Volume { } } -func (vol *Volume) Stage(stagingTargetPath string) error { +func (vol *Volume) Publish(targetPath string) error { // check whether it can be mounted - if isMnt, err := checkMount(stagingTargetPath); err != nil { + if isMnt, err := checkMount(targetPath); err != nil { return err } else if isMnt { // try to unmount before mounting again - _ = mountutil.Unmount(stagingTargetPath) + _ = mountutil.Unmount(targetPath) } - if u, err := vol.mounter.Mount(stagingTargetPath); err == nil { - if vol.StagedPath != "" { - if vol.StagedPath == stagingTargetPath { - glog.Warningf("staged path is already set to %s for volume %s", vol.StagedPath, vol.VolumeId) + if u, err := vol.mounter.Mount(targetPath); err == nil { + if vol.TargetPath != "" { + if vol.TargetPath == targetPath { + glog.Warningf("target path is already set to %s for volume %s", vol.TargetPath, vol.VolumeId) } else { - glog.Warningf("staged path is already set to %s and differs from %s for volume %s", vol.StagedPath, stagingTargetPath, vol.VolumeId) + glog.Warningf("target path is already set to %s and differs from %s for volume %s", vol.TargetPath, targetPath, vol.VolumeId) } } - - vol.StagedPath = stagingTargetPath + vol.TargetPath = targetPath vol.unmounter = u - return nil } else { return err } } -func (vol *Volume) Publish(stagingTargetPath string, targetPath string, readOnly bool) error { - // check whether it can be mounted - if isMnt, err := checkMount(targetPath); err != nil { - return err - } else if isMnt { - // maybe already mounted? - return nil - } - - // Use bind mount to create an alias of the real mount point. - mountOptions := []string{"bind"} - if readOnly { - mountOptions = append(mountOptions, "ro") - } - - if err := mountutil.Mount(stagingTargetPath, targetPath, "", mountOptions); err != nil { - return err - } - - return nil -} - func (vol *Volume) Quota(sizeByte int64) error { target := fmt.Sprintf("passthrough:///unix://%s", vol.localSocket) dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) @@ -98,33 +73,24 @@ func (vol *Volume) Quota(sizeByte int64) error { } func (vol *Volume) Unpublish(targetPath string) error { - // Try unmounting target path and deleting it. - if err := mount.CleanupMountPoint(targetPath, mountutil, true); err != nil { - return err - } - - return nil -} - -func (vol *Volume) Unstage(stagingTargetPath string) error { - glog.V(0).Infof("unmounting volume %s from %s", vol.VolumeId, stagingTargetPath) + glog.V(0).Infof("unmounting volume %s from %s", vol.VolumeId, targetPath) if vol.unmounter == nil { - glog.Errorf("volume is not mounted: %s, path: %s", vol.VolumeId, stagingTargetPath) + glog.Errorf("volume is not mounted: %s, path: %s", vol.VolumeId, targetPath) return nil } - if stagingTargetPath != vol.StagedPath { - glog.Warningf("staging path %s differs for volume %s at %s", stagingTargetPath, vol.VolumeId, vol.StagedPath) + if targetPath != vol.TargetPath { + glog.Warningf("staging path %s differs for volume %s at %s", targetPath, vol.VolumeId, vol.TargetPath) } if err := vol.unmounter.Unmount(); err != nil { - glog.Errorf("error unmounting volume during unstage: %s, err: %v", stagingTargetPath, err) + glog.Errorf("error unmounting volume during unstage: %s, err: %v", targetPath, err) return err } - if err := os.Remove(stagingTargetPath); err != nil && !os.IsNotExist(err) { - glog.Errorf("error removing staging path for volume %s at %s, err: %v", vol.VolumeId, stagingTargetPath, err) + if err := os.Remove(targetPath); err != nil && !os.IsNotExist(err) { + glog.Errorf("error removing staging path for volume %s at %s, err: %v", vol.VolumeId, targetPath, err) return err } |
