aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorViktor Kuzmin <kvaster@gmail.com>2023-08-06 23:23:54 +0300
committerChris Lu <chrislusf@users.noreply.github.com>2023-08-07 10:22:05 -0700
commit251eb9b4b8dc52ca55f3903b64f4f7027343fd5b (patch)
treeeb92e65f3dcea999d094bf5e446d57d0d407ec30
parentd84db4bbebec5d8c0d0e20dbf9deffdd24ba1151 (diff)
downloadseaweedfs-csi-driver-251eb9b4b8dc52ca55f3903b64f4f7027343fd5b.tar.xz
seaweedfs-csi-driver-251eb9b4b8dc52ca55f3903b64f4f7027343fd5b.zip
Graceful stop with mounts cleanup
-rw-r--r--pkg/driver/driver.go25
-rw-r--r--pkg/driver/nodeserver.go14
-rw-r--r--pkg/driver/server.go7
-rw-r--r--pkg/driver/volume.go21
4 files changed, 60 insertions, 7 deletions
diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go
index fcef08a..12954e9 100644
--- a/pkg/driver/driver.go
+++ b/pkg/driver/driver.go
@@ -3,6 +3,8 @@ package driver
import (
"fmt"
"os"
+ "os/signal"
+ "syscall"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/seaweedfs/seaweedfs-csi-driver/pkg/datalocality"
@@ -90,12 +92,31 @@ func (n *SeaweedFsDriver) initClient() error {
}
func (n *SeaweedFsDriver) Run() {
+ glog.Info("starting")
+
+ controller := NewControllerServer(n)
+ node := NewNodeServer(n)
+
s := NewNonBlockingGRPCServer()
s.Start(n.endpoint,
NewIdentityServer(n),
- NewControllerServer(n),
- NewNodeServer(n))
+ controller,
+ node)
+ s.Wait()
+
+ stopChan := make(chan os.Signal)
+ signal.Notify(stopChan, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
+ <-stopChan
+
+ glog.Infof("stopping")
+
+ s.Stop()
s.Wait()
+
+ glog.Infof("node cleanup")
+ node.NodeCleanup()
+
+ glog.Infof("stopped")
}
func (n *SeaweedFsDriver) AddVolumeCapabilityAccessModes(vc []csi.VolumeCapability_AccessMode_Mode) []*csi.VolumeCapability_AccessMode {
diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go
index 56d02c6..f92769d 100644
--- a/pkg/driver/nodeserver.go
+++ b/pkg/driver/nodeserver.go
@@ -265,6 +265,20 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
return &csi.NodeExpandVolumeResponse{}, nil
}
+func (ns *NodeServer) NodeCleanup() {
+ ns.volumes.Range(func(_, vol any) bool {
+ v := vol.(*Volume)
+ if len(v.StagedPath) > 0 {
+ glog.Infof("cleaning up volume %s at %s", v.VolumeId, v.StagedPath)
+ err := v.Unstage(v.StagedPath)
+ if err != nil {
+ glog.Warningf("error cleaning up volume %s at %s, err: %v", v.VolumeId, v.StagedPath, err)
+ }
+ }
+ return true
+ })
+}
+
func (ns *NodeServer) getVolumeMutex(volumeID string) *sync.Mutex {
return ns.volumeMutexes.GetMutex(volumeID)
}
diff --git a/pkg/driver/server.go b/pkg/driver/server.go
index 99580ae..b998533 100644
--- a/pkg/driver/server.go
+++ b/pkg/driver/server.go
@@ -34,10 +34,12 @@ type nonBlockingGRPCServer struct {
}
func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
-
s.wg.Add(1)
- go s.serve(endpoint, ids, cs, ns)
+ go func() {
+ defer s.wg.Done()
+ s.serve(endpoint, ids, cs, ns)
+ }()
return
}
@@ -92,5 +94,4 @@ func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, c
glog.Infof("Listening for connections on address: %#v", listener.Addr())
server.Serve(listener)
-
}
diff --git a/pkg/driver/volume.go b/pkg/driver/volume.go
index 5358700..e4e2cf3 100644
--- a/pkg/driver/volume.go
+++ b/pkg/driver/volume.go
@@ -13,7 +13,8 @@ import (
)
type Volume struct {
- VolumeId string
+ VolumeId string
+ StagedPath string
mounter Mounter
unmounter Unmounter
@@ -40,7 +41,17 @@ func (vol *Volume) Stage(stagingTargetPath string) error {
}
if u, err := vol.mounter.Mount(stagingTargetPath); err == nil {
+ if vol.StagedPath != "" {
+ if vol.StagedPath == stagingTargetPath {
+ glog.Warningf("staged path is already set to %s for volume %s", vol.StagedPath, vol.VolumeId)
+ } else {
+ glog.Warningf("staged path is already set to %s and differs from %s for volume %s", vol.StagedPath, stagingTargetPath, vol.VolumeId)
+ }
+ }
+
+ vol.StagedPath = stagingTargetPath
vol.unmounter = u
+
return nil
} else {
return err
@@ -99,15 +110,21 @@ func (vol *Volume) Unstage(stagingTargetPath string) error {
glog.V(0).Infof("unmounting volume %s from %s", vol.VolumeId, stagingTargetPath)
if vol.unmounter == nil {
- glog.Errorf("volume is not mounted: %s, path", vol.VolumeId, stagingTargetPath)
+ glog.Errorf("volume is not mounted: %s, path: %s", vol.VolumeId, stagingTargetPath)
return nil
}
+ if stagingTargetPath != vol.StagedPath {
+ glog.Warningf("staging path %s differs for volume %s at %s", stagingTargetPath, vol.VolumeId, vol.StagedPath)
+ }
+
if err := vol.unmounter.Unmount(); err != nil {
+ glog.Errorf("error unmounting volume during unstage: %s, err: %v", err)
return err
}
if err := os.Remove(stagingTargetPath); err != nil && !os.IsNotExist(err) {
+ glog.Errorf("error removing staging path for volume %s at %s, err: %v", vol.VolumeId, stagingTargetPath, err)
return err
}