diff options
| author | chrislu <chrislu@roblox.com> | 2024-01-18 08:06:24 -0800 |
|---|---|---|
| committer | chrislu <chrislu@roblox.com> | 2024-01-18 08:06:24 -0800 |
| commit | b982553bf132fa850841f9997e182c6112dd22f3 (patch) | |
| tree | c94ea66e80a85b7ea36321f590e7549f5d4a54cf /pkg/driver/nodeserver.go | |
| parent | 1c7e9db74fa8719d84804b30072af1bb680d8332 (diff) | |
| download | seaweedfs-csi-driver-b982553bf132fa850841f9997e182c6112dd22f3.tar.xz seaweedfs-csi-driver-b982553bf132fa850841f9997e182c6112dd22f3.zip | |
Revert "chore(seaweedfs-csi-driver): delete unnecessary stage"
This reverts commit 44283c0ffe56e3180dae5b93801d07a3d621d355.
Diffstat (limited to 'pkg/driver/nodeserver.go')
| -rw-r--r-- | pkg/driver/nodeserver.go | 141 |
1 files changed, 114 insertions, 27 deletions
diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go index c0572a8..616c79b 100644 --- a/pkg/driver/nodeserver.go +++ b/pkg/driver/nodeserver.go @@ -26,12 +26,12 @@ type NodeServer struct { var _ = csi.NodeServer(&NodeServer{}) -func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { +func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { volumeID := req.GetVolumeId() // mount the fs here - targetPath := req.GetTargetPath() + stagingTargetPath := req.GetStagingTargetPath() - glog.Infof("node target volume %s to %s", volumeID, targetPath) + glog.Infof("node stage volume %s to %s", volumeID, stagingTargetPath) // Check arguments if req.GetVolumeCapability() == nil { @@ -43,7 +43,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis if volumeID == "" { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } - if targetPath == "" { + if stagingTargetPath == "" { return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } @@ -51,10 +51,10 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis volumeMutex.Lock() defer volumeMutex.Unlock() - // The volume has been publish. + // The volume has been staged. if _, ok := ns.volumes.Load(volumeID); ok { - glog.Infof("volume %s has been already published", volumeID) - return &csi.NodePublishVolumeResponse{}, nil + glog.Infof("volume %s has been already staged", volumeID) + return &csi.NodeStageVolumeResponse{}, nil } volContext := req.GetVolumeContext() @@ -62,14 +62,14 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis mounter, err := newMounter(volumeID, readOnly, ns.Driver, volContext) if err != nil { - // node publish is unsuccessfull + // node stage is unsuccessfull ns.removeVolumeMutex(volumeID) return nil, err } volume := NewVolume(volumeID, mounter) - if err := volume.Publish(targetPath); err != nil { - // node publish is unsuccessfull + if err := volume.Stage(stagingTargetPath); err != nil { + // node stage is unsuccessfull ns.removeVolumeMutex(volumeID) if os.IsPermission(err) { @@ -90,21 +90,63 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis } ns.volumes.Store(volumeID, volume) - glog.Infof("volume %s successfully publish to %s", volumeID, targetPath) + glog.Infof("volume %s successfully staged to %s", volumeID, stagingTargetPath) + return &csi.NodeStageVolumeResponse{}, nil +} + +func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { + volumeID := req.GetVolumeId() + targetPath := req.GetTargetPath() + stagingTargetPath := req.GetStagingTargetPath() + + glog.Infof("node publish volume %s to %s", volumeID, targetPath) + + // Check arguments + if req.GetVolumeCapability() == nil { + return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request") + } + if !isValidVolumeCapabilities(ns.Driver.vcap, []*csi.VolumeCapability{req.GetVolumeCapability()}) { + // return nil, status.Error(codes.InvalidArgument, "Volume capability not supported") + } + if volumeID == "" { + return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + } + if targetPath == "" { + return nil, status.Error(codes.InvalidArgument, "Target path missing in request") + } + if stagingTargetPath == "" { + return nil, status.Error(codes.InvalidArgument, "Staging target path missing in request") + } + + volumeMutex := ns.getVolumeMutex(volumeID) + volumeMutex.Lock() + defer volumeMutex.Unlock() + + volume, ok := ns.volumes.Load(volumeID) + if !ok { + return nil, status.Error(codes.FailedPrecondition, "volume hasn't been staged yet") + } + + // When pod uses a volume in read-only mode, k8s will automatically + // mount the volume as a read-only file system. + if err := volume.(*Volume).Publish(stagingTargetPath, targetPath, req.GetReadonly()); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + glog.Infof("volume %s successfully published to %s", volumeID, targetPath) return &csi.NodePublishVolumeResponse{}, nil } func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { volumeID := req.GetVolumeId() targetPath := req.GetTargetPath() - glog.Infof("node unpublish volume %s from %s", volumeID, targetPath) - // Check arguments if volumeID == "" { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } + if targetPath == "" { return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } @@ -119,18 +161,15 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu // make sure there is no any garbage _ = mount.CleanupMountPoint(targetPath, mountutil, true) - } else { - if err := volume.(*Volume).Unpublish(targetPath); err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } else { - ns.volumes.Delete(volumeID) - } + + return &csi.NodeUnpublishVolumeResponse{}, nil } - // remove mutex on successfull unpublish - ns.volumeMutexes.RemoveMutex(volumeID) + if err := volume.(*Volume).Unpublish(targetPath); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } - glog.Infof("volume %s successfully unpublish from %s", volumeID, targetPath) + glog.Infof("volume %s successfully unpublished from %s", volumeID, targetPath) return &csi.NodeUnpublishVolumeResponse{}, nil } @@ -145,11 +184,19 @@ func (ns *NodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoReque func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { glog.V(3).Infof("node get capabilities") + return &csi.NodeGetCapabilitiesResponse{ Capabilities: []*csi.NodeServiceCapability{ { Type: &csi.NodeServiceCapability_Rpc{ Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, + }, + }, + }, + { + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ Type: csi.NodeServiceCapability_RPC_EXPAND_VOLUME, }, }, @@ -158,6 +205,46 @@ func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetC }, nil } +func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { + volumeID := req.GetVolumeId() + stagingTargetPath := req.GetStagingTargetPath() + + glog.Infof("node unstage volume %s from %s", volumeID, stagingTargetPath) + + // Check arguments + if volumeID == "" { + return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + } + if stagingTargetPath == "" { + return nil, status.Error(codes.InvalidArgument, "Target path missing in request") + } + + volumeMutex := ns.getVolumeMutex(volumeID) + volumeMutex.Lock() + defer volumeMutex.Unlock() + + volume, ok := ns.volumes.Load(volumeID) + if !ok { + glog.Warningf("volume %s hasn't been staged", volumeID) + + // make sure there is no any garbage + _ = mount.CleanupMountPoint(stagingTargetPath, mountutil, true) + } else { + if err := volume.(*Volume).Unstage(stagingTargetPath); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } else { + ns.volumes.Delete(volumeID) + } + } + + // remove mutex on successfull unstage + ns.volumeMutexes.RemoveMutex(volumeID) + + glog.Infof("volume %s successfully unstaged from %s", volumeID, stagingTargetPath) + + return &csi.NodeUnstageVolumeResponse{}, nil +} + func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) { volumeID := req.GetVolumeId() volumePath := req.GetVolumePath() @@ -192,11 +279,11 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV func (ns *NodeServer) NodeCleanup() { ns.volumes.Range(func(_, vol any) bool { v := vol.(*Volume) - if len(v.TargetPath) > 0 { - glog.Infof("cleaning up volume %s at %s", v.VolumeId, v.TargetPath) - err := v.Unpublish(v.TargetPath) + if len(v.StagedPath) > 0 { + glog.Infof("cleaning up volume %s at %s", v.VolumeId, v.StagedPath) + err := v.Unstage(v.StagedPath) if err != nil { - glog.Warningf("error cleaning up volume %s at %s, err: %v", v.VolumeId, v.TargetPath, err) + glog.Warningf("error cleaning up volume %s at %s, err: %v", v.VolumeId, v.StagedPath, err) } } return true @@ -211,7 +298,7 @@ func (ns *NodeServer) removeVolumeMutex(volumeID string) { ns.volumeMutexes.RemoveMutex(volumeID) } -func isVolumeReadOnly(req *csi.NodePublishVolumeRequest) bool { +func isVolumeReadOnly(req *csi.NodeStageVolumeRequest) bool { mode := req.GetVolumeCapability().GetAccessMode().Mode readOnlyModes := []csi.VolumeCapability_AccessMode_Mode{ |
