diff options
| author | garenchan <garenchan23@gmail.com> | 2022-07-01 12:27:25 +0800 |
|---|---|---|
| committer | garenchan <garenchan23@gmail.com> | 2022-07-01 12:27:25 +0800 |
| commit | 96d415ad3e121518552629f31a7cbe6eee9c76e4 (patch) | |
| tree | ec08f9e968af823d73c16327b86f79707e198eab | |
| parent | 27586e9139d4bacf1d37525a2fb7726f472931f4 (diff) | |
| download | seaweedfs-csi-driver-96d415ad3e121518552629f31a7cbe6eee9c76e4.tar.xz seaweedfs-csi-driver-96d415ad3e121518552629f31a7cbe6eee9c76e4.zip | |
Fix error when managing multiple volumes
| -rw-r--r-- | pkg/driver/controllerserver.go | 8 | ||||
| -rw-r--r-- | pkg/driver/driver.go | 12 | ||||
| -rw-r--r-- | pkg/driver/mounter.go | 14 | ||||
| -rw-r--r-- | pkg/driver/mounter_seaweedfs.go | 45 | ||||
| -rw-r--r-- | pkg/driver/nodeserver.go | 33 |
5 files changed, 58 insertions, 54 deletions
diff --git a/pkg/driver/controllerserver.go b/pkg/driver/controllerserver.go index b311732..6a8585c 100644 --- a/pkg/driver/controllerserver.go +++ b/pkg/driver/controllerserver.go @@ -49,9 +49,6 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol } glog.V(4).Infof("params:%v", params) capacity := req.GetCapacityRange().GetRequiredBytes() - cs.Driver.Capacity = capacity - cs.Driver.DiskType = params["diskType"] - if capacity > 0 { glog.V(4).Infof("volume capacity: %d", capacity) params["volumeCapacity"] = strconv.FormatInt(capacity, 10) @@ -178,8 +175,11 @@ func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnap } func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { + volumeID := req.GetVolumeId() + glog.V(0).Infof("Controller expand volume %s to %d bytes", volumeID, req.CapacityRange.RequiredBytes) - clientConn, err := grpc.Dial("passthrough:///unix://"+cs.Driver.mountSocket, grpc.WithTransportCredentials(insecure.NewCredentials())) + localSocket := GetLocalSocket(volumeID) + clientConn, err := grpc.Dial("passthrough:///unix://"+localSocket, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index 00e1f35..d850dbd 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -30,8 +30,7 @@ type SeaweedFsDriver struct { nodeID string version string - endpoint string - mountSocket string + endpoint string vcap []*csi.VolumeCapability_AccessMode cscap []*csi.ControllerServiceCapability @@ -44,8 +43,6 @@ type SeaweedFsDriver struct { CacheDir string UidMap string GidMap string - Capacity int64 - DiskType string } func NewSeaweedFsDriver(filer, nodeID, endpoint string) *SeaweedFsDriver { @@ -54,12 +51,6 @@ func NewSeaweedFsDriver(filer, nodeID, endpoint string) *SeaweedFsDriver { util.LoadConfiguration("security", false) - montDirHash := util.HashToInt32([]byte(endpoint)) - if montDirHash < 0 { - montDirHash = -montDirHash - } - socket := fmt.Sprintf("/tmp/seaweedfs-mount-%d.sock", montDirHash) - n := &SeaweedFsDriver{ endpoint: endpoint, nodeID: nodeID, @@ -67,7 +58,6 @@ func NewSeaweedFsDriver(filer, nodeID, endpoint string) *SeaweedFsDriver { version: version, filers: pb.ServerAddresses(filer).ToAddresses(), grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"), - mountSocket: socket, } n.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{ diff --git a/pkg/driver/mounter.go b/pkg/driver/mounter.go index e0a6283..2c019ed 100644 --- a/pkg/driver/mounter.go +++ b/pkg/driver/mounter.go @@ -20,8 +20,18 @@ type Mounter interface { Mount(target string) error } -func newMounter(path string, collection string, readOnly bool, driver *SeaweedFsDriver, volContext map[string]string) (Mounter, error) { - return newSeaweedFsMounter(path, collection, readOnly, driver, volContext) +func newMounter(volumeID string, readOnly bool, driver *SeaweedFsDriver, volContext map[string]string) (Mounter, error) { + path, ok := volContext["path"] + if !ok { + path = fmt.Sprintf("/buckets/%s", volumeID) + } + + collection, ok := volContext["collection"] + if !ok { + collection = volumeID + } + + return newSeaweedFsMounter(volumeID, path, collection, readOnly, driver, volContext) } func fuseMount(path string, command string, args []string) error { diff --git a/pkg/driver/mounter_seaweedfs.go b/pkg/driver/mounter_seaweedfs.go index b38fe31..54c10c1 100644 --- a/pkg/driver/mounter_seaweedfs.go +++ b/pkg/driver/mounter_seaweedfs.go @@ -2,12 +2,16 @@ package driver import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" + "strconv" "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" ) // Implements Mounter type seaweedFsMounter struct { + volumeID string path string collection string readOnly bool @@ -19,8 +23,9 @@ const ( seaweedFsCmd = "weed" ) -func newSeaweedFsMounter(path string, collection string, readOnly bool, driver *SeaweedFsDriver, volContext map[string]string) (Mounter, error) { +func newSeaweedFsMounter(volumeID string, path string, collection string, readOnly bool, driver *SeaweedFsDriver, volContext map[string]string) (Mounter, error) { return &seaweedFsMounter{ + volumeID: volumeID, path: path, collection: collection, readOnly: readOnly, @@ -36,7 +41,6 @@ func (seaweedFs *seaweedFsMounter) Mount(target string) error { for _, address := range seaweedFs.driver.filers { filers = append(filers, string(address)) } - capacityMB := seaweedFs.driver.Capacity / 1024 / 1024 args := []string{ "-logtostderr=true", @@ -45,11 +49,10 @@ func (seaweedFs *seaweedFsMounter) Mount(target string) error { "-umask=000", fmt.Sprintf("-dir=%s", target), fmt.Sprintf("-collection=%s", seaweedFs.collection), - fmt.Sprintf("-collectionQuotaMB=%d", capacityMB), fmt.Sprintf("-filer=%s", strings.Join(filers, ",")), fmt.Sprintf("-filer.path=%s", seaweedFs.path), fmt.Sprintf("-cacheCapacityMB=%d", seaweedFs.driver.CacheSizeMB), - fmt.Sprintf("-localSocket=%s", seaweedFs.driver.mountSocket), + fmt.Sprintf("-localSocket=%s", GetLocalSocket(seaweedFs.volumeID)), } // came from https://github.com/seaweedfs/seaweedfs-csi-driver/pull/12 @@ -63,6 +66,11 @@ func (seaweedFs *seaweedFsMounter) Mount(target string) error { args = append(args, fmt.Sprintf("-map.gid=%s", value)) case "replication": args = append(args, fmt.Sprintf("-replication=%s", value)) + case "diskType": + args = append(args, fmt.Sprintf("-disk=%s", value)) + case "volumeCapacity": + capacityMB := parseVolumeCapacity(value) + args = append(args, fmt.Sprintf("-collectionQuotaMB=%d", capacityMB)) } } @@ -83,13 +91,32 @@ func (seaweedFs *seaweedFsMounter) Mount(target string) error { args = append(args, fmt.Sprintf("-map.gid=%s", seaweedFs.driver.GidMap)) } - if seaweedFs.driver.DiskType != "" { - args = append(args, fmt.Sprintf("-disk=%s", seaweedFs.driver.DiskType)) - } - err := fuseMount(target, seaweedFsCmd, args) if err != nil { glog.Errorf("mount %v %s to %s: %s", seaweedFs.driver.filers, seaweedFs.path, target, err) } return err } + +func GetLocalSocket(volumeID string) string { + montDirHash := util.HashToInt32([]byte(volumeID)) + if montDirHash < 0 { + montDirHash = -montDirHash + } + + socket := fmt.Sprintf("/tmp/seaweedfs-mount-%d.sock", montDirHash) + return socket +} + +func parseVolumeCapacity(volumeCapacity string) int64 { + var capacity int64 + + if vCap, err := strconv.ParseInt(volumeCapacity, 10, 64); err != nil { + glog.Errorf("volumeCapacity %s can not be parsed to Int64, err is: %v", volumeCapacity, err) + } else { + capacity = vCap + } + + capacityMB := capacity / 1024 / 1024 + return capacityMB +} diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go index 6c6fa19..d48875f 100644 --- a/pkg/driver/nodeserver.go +++ b/pkg/driver/nodeserver.go @@ -2,9 +2,7 @@ package driver import ( "context" - "fmt" "os" - "strconv" "strings" "github.com/chrislusf/seaweedfs/weed/glog" @@ -56,31 +54,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis } volContext := req.GetVolumeContext() - - path, ok := volContext["path"] - if !ok { - path = fmt.Sprintf("/buckets/%s", volumeID) - } - - collection, ok := volContext["collection"] - if !ok { - collection = volumeID - } - - if diskType, ok := volContext["diskType"]; ok { - ns.Driver.DiskType = diskType - } - - if volumeCapacity, ok := volContext["volumeCapacity"]; ok { - vCap, err := strconv.ParseInt(volumeCapacity, 10, 64) - if err != nil { - glog.Errorf("volumeCapacity %s can not be parsed to Int64, err is: %v", volumeCapacity, err) - } else { - ns.Driver.Capacity = vCap - } - } - - mounter, err := newMounter(path, collection, req.GetReadonly(), ns.Driver, volContext) + mounter, err := newMounter(volumeID, req.GetReadonly(), ns.Driver, volContext) if err != nil { return nil, err } @@ -173,8 +147,11 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol } func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) { + volumeID := req.GetVolumeId() + glog.V(0).Infof("Node expand volume %s to %d bytes", volumeID, req.CapacityRange.RequiredBytes) - clientConn, err := grpc.Dial("passthrough:///unix://"+ns.Driver.mountSocket, grpc.WithTransportCredentials(insecure.NewCredentials())) + localSocket := GetLocalSocket(volumeID) + clientConn, err := grpc.Dial("passthrough:///unix://"+localSocket, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } |
