aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2022-07-01 02:19:02 -0700
committerGitHub <noreply@github.com>2022-07-01 02:19:02 -0700
commit3a5a3406256bc10263f0815baef112a19b62556b (patch)
treeec08f9e968af823d73c16327b86f79707e198eab
parent27586e9139d4bacf1d37525a2fb7726f472931f4 (diff)
parent96d415ad3e121518552629f31a7cbe6eee9c76e4 (diff)
downloadseaweedfs-csi-driver-3a5a3406256bc10263f0815baef112a19b62556b.tar.xz
seaweedfs-csi-driver-3a5a3406256bc10263f0815baef112a19b62556b.zip
Merge pull request #66 from garenchan/ck-dev1
Fix error when managing multiple volumes
-rw-r--r--pkg/driver/controllerserver.go8
-rw-r--r--pkg/driver/driver.go12
-rw-r--r--pkg/driver/mounter.go14
-rw-r--r--pkg/driver/mounter_seaweedfs.go45
-rw-r--r--pkg/driver/nodeserver.go33
5 files changed, 58 insertions, 54 deletions
diff --git a/pkg/driver/controllerserver.go b/pkg/driver/controllerserver.go
index b311732..6a8585c 100644
--- a/pkg/driver/controllerserver.go
+++ b/pkg/driver/controllerserver.go
@@ -49,9 +49,6 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
}
glog.V(4).Infof("params:%v", params)
capacity := req.GetCapacityRange().GetRequiredBytes()
- cs.Driver.Capacity = capacity
- cs.Driver.DiskType = params["diskType"]
-
if capacity > 0 {
glog.V(4).Infof("volume capacity: %d", capacity)
params["volumeCapacity"] = strconv.FormatInt(capacity, 10)
@@ -178,8 +175,11 @@ func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnap
}
func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
+ volumeID := req.GetVolumeId()
+ glog.V(0).Infof("Controller expand volume %s to %d bytes", volumeID, req.CapacityRange.RequiredBytes)
- clientConn, err := grpc.Dial("passthrough:///unix://"+cs.Driver.mountSocket, grpc.WithTransportCredentials(insecure.NewCredentials()))
+ localSocket := GetLocalSocket(volumeID)
+ clientConn, err := grpc.Dial("passthrough:///unix://"+localSocket, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go
index 00e1f35..d850dbd 100644
--- a/pkg/driver/driver.go
+++ b/pkg/driver/driver.go
@@ -30,8 +30,7 @@ type SeaweedFsDriver struct {
nodeID string
version string
- endpoint string
- mountSocket string
+ endpoint string
vcap []*csi.VolumeCapability_AccessMode
cscap []*csi.ControllerServiceCapability
@@ -44,8 +43,6 @@ type SeaweedFsDriver struct {
CacheDir string
UidMap string
GidMap string
- Capacity int64
- DiskType string
}
func NewSeaweedFsDriver(filer, nodeID, endpoint string) *SeaweedFsDriver {
@@ -54,12 +51,6 @@ func NewSeaweedFsDriver(filer, nodeID, endpoint string) *SeaweedFsDriver {
util.LoadConfiguration("security", false)
- montDirHash := util.HashToInt32([]byte(endpoint))
- if montDirHash < 0 {
- montDirHash = -montDirHash
- }
- socket := fmt.Sprintf("/tmp/seaweedfs-mount-%d.sock", montDirHash)
-
n := &SeaweedFsDriver{
endpoint: endpoint,
nodeID: nodeID,
@@ -67,7 +58,6 @@ func NewSeaweedFsDriver(filer, nodeID, endpoint string) *SeaweedFsDriver {
version: version,
filers: pb.ServerAddresses(filer).ToAddresses(),
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"),
- mountSocket: socket,
}
n.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{
diff --git a/pkg/driver/mounter.go b/pkg/driver/mounter.go
index e0a6283..2c019ed 100644
--- a/pkg/driver/mounter.go
+++ b/pkg/driver/mounter.go
@@ -20,8 +20,18 @@ type Mounter interface {
Mount(target string) error
}
-func newMounter(path string, collection string, readOnly bool, driver *SeaweedFsDriver, volContext map[string]string) (Mounter, error) {
- return newSeaweedFsMounter(path, collection, readOnly, driver, volContext)
+func newMounter(volumeID string, readOnly bool, driver *SeaweedFsDriver, volContext map[string]string) (Mounter, error) {
+ path, ok := volContext["path"]
+ if !ok {
+ path = fmt.Sprintf("/buckets/%s", volumeID)
+ }
+
+ collection, ok := volContext["collection"]
+ if !ok {
+ collection = volumeID
+ }
+
+ return newSeaweedFsMounter(volumeID, path, collection, readOnly, driver, volContext)
}
func fuseMount(path string, command string, args []string) error {
diff --git a/pkg/driver/mounter_seaweedfs.go b/pkg/driver/mounter_seaweedfs.go
index b38fe31..54c10c1 100644
--- a/pkg/driver/mounter_seaweedfs.go
+++ b/pkg/driver/mounter_seaweedfs.go
@@ -2,12 +2,16 @@ package driver
import (
"fmt"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "strconv"
"strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
// Implements Mounter
type seaweedFsMounter struct {
+ volumeID string
path string
collection string
readOnly bool
@@ -19,8 +23,9 @@ const (
seaweedFsCmd = "weed"
)
-func newSeaweedFsMounter(path string, collection string, readOnly bool, driver *SeaweedFsDriver, volContext map[string]string) (Mounter, error) {
+func newSeaweedFsMounter(volumeID string, path string, collection string, readOnly bool, driver *SeaweedFsDriver, volContext map[string]string) (Mounter, error) {
return &seaweedFsMounter{
+ volumeID: volumeID,
path: path,
collection: collection,
readOnly: readOnly,
@@ -36,7 +41,6 @@ func (seaweedFs *seaweedFsMounter) Mount(target string) error {
for _, address := range seaweedFs.driver.filers {
filers = append(filers, string(address))
}
- capacityMB := seaweedFs.driver.Capacity / 1024 / 1024
args := []string{
"-logtostderr=true",
@@ -45,11 +49,10 @@ func (seaweedFs *seaweedFsMounter) Mount(target string) error {
"-umask=000",
fmt.Sprintf("-dir=%s", target),
fmt.Sprintf("-collection=%s", seaweedFs.collection),
- fmt.Sprintf("-collectionQuotaMB=%d", capacityMB),
fmt.Sprintf("-filer=%s", strings.Join(filers, ",")),
fmt.Sprintf("-filer.path=%s", seaweedFs.path),
fmt.Sprintf("-cacheCapacityMB=%d", seaweedFs.driver.CacheSizeMB),
- fmt.Sprintf("-localSocket=%s", seaweedFs.driver.mountSocket),
+ fmt.Sprintf("-localSocket=%s", GetLocalSocket(seaweedFs.volumeID)),
}
// came from https://github.com/seaweedfs/seaweedfs-csi-driver/pull/12
@@ -63,6 +66,11 @@ func (seaweedFs *seaweedFsMounter) Mount(target string) error {
args = append(args, fmt.Sprintf("-map.gid=%s", value))
case "replication":
args = append(args, fmt.Sprintf("-replication=%s", value))
+ case "diskType":
+ args = append(args, fmt.Sprintf("-disk=%s", value))
+ case "volumeCapacity":
+ capacityMB := parseVolumeCapacity(value)
+ args = append(args, fmt.Sprintf("-collectionQuotaMB=%d", capacityMB))
}
}
@@ -83,13 +91,32 @@ func (seaweedFs *seaweedFsMounter) Mount(target string) error {
args = append(args, fmt.Sprintf("-map.gid=%s", seaweedFs.driver.GidMap))
}
- if seaweedFs.driver.DiskType != "" {
- args = append(args, fmt.Sprintf("-disk=%s", seaweedFs.driver.DiskType))
- }
-
err := fuseMount(target, seaweedFsCmd, args)
if err != nil {
glog.Errorf("mount %v %s to %s: %s", seaweedFs.driver.filers, seaweedFs.path, target, err)
}
return err
}
+
+func GetLocalSocket(volumeID string) string {
+ montDirHash := util.HashToInt32([]byte(volumeID))
+ if montDirHash < 0 {
+ montDirHash = -montDirHash
+ }
+
+ socket := fmt.Sprintf("/tmp/seaweedfs-mount-%d.sock", montDirHash)
+ return socket
+}
+
+func parseVolumeCapacity(volumeCapacity string) int64 {
+ var capacity int64
+
+ if vCap, err := strconv.ParseInt(volumeCapacity, 10, 64); err != nil {
+ glog.Errorf("volumeCapacity %s can not be parsed to Int64, err is: %v", volumeCapacity, err)
+ } else {
+ capacity = vCap
+ }
+
+ capacityMB := capacity / 1024 / 1024
+ return capacityMB
+}
diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go
index 6c6fa19..d48875f 100644
--- a/pkg/driver/nodeserver.go
+++ b/pkg/driver/nodeserver.go
@@ -2,9 +2,7 @@ package driver
import (
"context"
- "fmt"
"os"
- "strconv"
"strings"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -56,31 +54,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
}
volContext := req.GetVolumeContext()
-
- path, ok := volContext["path"]
- if !ok {
- path = fmt.Sprintf("/buckets/%s", volumeID)
- }
-
- collection, ok := volContext["collection"]
- if !ok {
- collection = volumeID
- }
-
- if diskType, ok := volContext["diskType"]; ok {
- ns.Driver.DiskType = diskType
- }
-
- if volumeCapacity, ok := volContext["volumeCapacity"]; ok {
- vCap, err := strconv.ParseInt(volumeCapacity, 10, 64)
- if err != nil {
- glog.Errorf("volumeCapacity %s can not be parsed to Int64, err is: %v", volumeCapacity, err)
- } else {
- ns.Driver.Capacity = vCap
- }
- }
-
- mounter, err := newMounter(path, collection, req.GetReadonly(), ns.Driver, volContext)
+ mounter, err := newMounter(volumeID, req.GetReadonly(), ns.Driver, volContext)
if err != nil {
return nil, err
}
@@ -173,8 +147,11 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
}
func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
+ volumeID := req.GetVolumeId()
+ glog.V(0).Infof("Node expand volume %s to %d bytes", volumeID, req.CapacityRange.RequiredBytes)
- clientConn, err := grpc.Dial("passthrough:///unix://"+ns.Driver.mountSocket, grpc.WithTransportCredentials(insecure.NewCredentials()))
+ localSocket := GetLocalSocket(volumeID)
+ clientConn, err := grpc.Dial("passthrough:///unix://"+localSocket, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}