diff options
| author | Viktor Kuzmin <kvaster@gmail.com> | 2023-08-06 23:23:54 +0300 |
|---|---|---|
| committer | Chris Lu <chrislusf@users.noreply.github.com> | 2023-08-07 10:22:05 -0700 |
| commit | 251eb9b4b8dc52ca55f3903b64f4f7027343fd5b (patch) | |
| tree | eb92e65f3dcea999d094bf5e446d57d0d407ec30 /pkg/driver | |
| parent | d84db4bbebec5d8c0d0e20dbf9deffdd24ba1151 (diff) | |
| download | seaweedfs-csi-driver-251eb9b4b8dc52ca55f3903b64f4f7027343fd5b.tar.xz seaweedfs-csi-driver-251eb9b4b8dc52ca55f3903b64f4f7027343fd5b.zip | |
Graceful stop with mounts cleanup
Diffstat (limited to 'pkg/driver')
| -rw-r--r-- | pkg/driver/driver.go | 25 | ||||
| -rw-r--r-- | pkg/driver/nodeserver.go | 14 | ||||
| -rw-r--r-- | pkg/driver/server.go | 7 | ||||
| -rw-r--r-- | pkg/driver/volume.go | 21 |
4 files changed, 60 insertions, 7 deletions
diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index fcef08a..12954e9 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -3,6 +3,8 @@ package driver import ( "fmt" "os" + "os/signal" + "syscall" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/seaweedfs/seaweedfs-csi-driver/pkg/datalocality" @@ -90,12 +92,31 @@ func (n *SeaweedFsDriver) initClient() error { } func (n *SeaweedFsDriver) Run() { + glog.Info("starting") + + controller := NewControllerServer(n) + node := NewNodeServer(n) + s := NewNonBlockingGRPCServer() s.Start(n.endpoint, NewIdentityServer(n), - NewControllerServer(n), - NewNodeServer(n)) + controller, + node) + s.Wait() + + stopChan := make(chan os.Signal) + signal.Notify(stopChan, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) + <-stopChan + + glog.Infof("stopping") + + s.Stop() s.Wait() + + glog.Infof("node cleanup") + node.NodeCleanup() + + glog.Infof("stopped") } func (n *SeaweedFsDriver) AddVolumeCapabilityAccessModes(vc []csi.VolumeCapability_AccessMode_Mode) []*csi.VolumeCapability_AccessMode { diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go index 56d02c6..f92769d 100644 --- a/pkg/driver/nodeserver.go +++ b/pkg/driver/nodeserver.go @@ -265,6 +265,20 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV return &csi.NodeExpandVolumeResponse{}, nil } +func (ns *NodeServer) NodeCleanup() { + ns.volumes.Range(func(_, vol any) bool { + v := vol.(*Volume) + if len(v.StagedPath) > 0 { + glog.Infof("cleaning up volume %s at %s", v.VolumeId, v.StagedPath) + err := v.Unstage(v.StagedPath) + if err != nil { + glog.Warningf("error cleaning up volume %s at %s, err: %v", v.VolumeId, v.StagedPath, err) + } + } + return true + }) +} + func (ns *NodeServer) getVolumeMutex(volumeID string) *sync.Mutex { return ns.volumeMutexes.GetMutex(volumeID) } diff --git a/pkg/driver/server.go b/pkg/driver/server.go index 99580ae..b998533 100644 --- a/pkg/driver/server.go +++ b/pkg/driver/server.go @@ -34,10 +34,12 @@ type nonBlockingGRPCServer struct { } func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) { - s.wg.Add(1) - go s.serve(endpoint, ids, cs, ns) + go func() { + defer s.wg.Done() + s.serve(endpoint, ids, cs, ns) + }() return } @@ -92,5 +94,4 @@ func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, c glog.Infof("Listening for connections on address: %#v", listener.Addr()) server.Serve(listener) - } diff --git a/pkg/driver/volume.go b/pkg/driver/volume.go index 5358700..e4e2cf3 100644 --- a/pkg/driver/volume.go +++ b/pkg/driver/volume.go @@ -13,7 +13,8 @@ import ( ) type Volume struct { - VolumeId string + VolumeId string + StagedPath string mounter Mounter unmounter Unmounter @@ -40,7 +41,17 @@ func (vol *Volume) Stage(stagingTargetPath string) error { } if u, err := vol.mounter.Mount(stagingTargetPath); err == nil { + if vol.StagedPath != "" { + if vol.StagedPath == stagingTargetPath { + glog.Warningf("staged path is already set to %s for volume %s", vol.StagedPath, vol.VolumeId) + } else { + glog.Warningf("staged path is already set to %s and differs from %s for volume %s", vol.StagedPath, stagingTargetPath, vol.VolumeId) + } + } + + vol.StagedPath = stagingTargetPath vol.unmounter = u + return nil } else { return err @@ -99,15 +110,21 @@ func (vol *Volume) Unstage(stagingTargetPath string) error { glog.V(0).Infof("unmounting volume %s from %s", vol.VolumeId, stagingTargetPath) if vol.unmounter == nil { - glog.Errorf("volume is not mounted: %s, path", vol.VolumeId, stagingTargetPath) + glog.Errorf("volume is not mounted: %s, path: %s", vol.VolumeId, stagingTargetPath) return nil } + if stagingTargetPath != vol.StagedPath { + glog.Warningf("staging path %s differs for volume %s at %s", stagingTargetPath, vol.VolumeId, vol.StagedPath) + } + if err := vol.unmounter.Unmount(); err != nil { + glog.Errorf("error unmounting volume during unstage: %s, err: %v", err) return err } if err := os.Remove(stagingTargetPath); err != nil && !os.IsNotExist(err) { + glog.Errorf("error removing staging path for volume %s at %s, err: %v", vol.VolumeId, stagingTargetPath, err) return err } |
