aboutsummaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rw-r--r--pkg/driver/controllerserver.go5
-rw-r--r--pkg/driver/mounter_seaweedfs.go21
-rw-r--r--pkg/driver/nodeserver.go11
-rw-r--r--pkg/driver/volume.go2
-rw-r--r--pkg/k8s/client.go42
5 files changed, 53 insertions, 28 deletions
diff --git a/pkg/driver/controllerserver.go b/pkg/driver/controllerserver.go
index 0d8b671..1bd2169 100644
--- a/pkg/driver/controllerserver.go
+++ b/pkg/driver/controllerserver.go
@@ -6,7 +6,6 @@ import (
"encoding/hex"
"fmt"
"io"
- "strconv"
"strings"
"github.com/container-storage-interface/spec/lib/go/csi"
@@ -48,10 +47,6 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
}
glog.V(4).Infof("params:%v", params)
capacity := req.GetCapacityRange().GetRequiredBytes()
- if capacity > 0 {
- glog.V(4).Infof("volume capacity: %d", capacity)
- params["volumeCapacity"] = strconv.FormatInt(capacity, 10)
- }
if err := filer_pb.Mkdir(cs.Driver, "/buckets", volumeId, nil); err != nil {
return nil, fmt.Errorf("error setting bucket metadata: %v", err)
diff --git a/pkg/driver/mounter_seaweedfs.go b/pkg/driver/mounter_seaweedfs.go
index 8f4ab9e..56ba98b 100644
--- a/pkg/driver/mounter_seaweedfs.go
+++ b/pkg/driver/mounter_seaweedfs.go
@@ -4,7 +4,6 @@ import (
"fmt"
"os"
"path/filepath"
- "strconv"
"strings"
"github.com/seaweedfs/seaweedfs-csi-driver/pkg/datalocality"
@@ -69,12 +68,6 @@ func (seaweedFs *seaweedFsMounter) Mount(target string) (Unmounter, error) {
args = append(args, "-readOnly")
}
- // Handle volumeCapacity from controllerserver.go:51
- if value, ok := seaweedFs.volContext["volumeCapacity"]; ok {
- capacityMB := parseVolumeCapacity(value)
- args = append(args, fmt.Sprintf("-collectionQuotaMB=%d", capacityMB))
- }
-
// Values for override-able args
// Whitelist for merging with volContext
argsMap := map[string]string{
@@ -126,7 +119,6 @@ func (seaweedFs *seaweedFsMounter) Mount(target string) (Unmounter, error) {
// Explicitly ignored volContext args e.g. handled somewhere else
ignoreArgs := []string{
- "volumeCapacity",
"dataLocality",
}
@@ -186,19 +178,6 @@ func GetLocalSocket(volumeID string) string {
return socket
}
-func parseVolumeCapacity(volumeCapacity string) int64 {
- var capacity int64
-
- if vCap, err := strconv.ParseInt(volumeCapacity, 10, 64); err != nil {
- glog.Errorf("volumeCapacity %s can not be parsed to Int64, err is: %v", volumeCapacity, err)
- } else {
- capacity = vCap
- }
-
- capacityMB := capacity / 1024 / 1024
- return capacityMB
-}
-
func in_arr(arr []string, val string) bool {
for _, v := range arr {
if val == v {
diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go
index 130c2a5..616c79b 100644
--- a/pkg/driver/nodeserver.go
+++ b/pkg/driver/nodeserver.go
@@ -7,6 +7,7 @@ import (
"sync"
"github.com/container-storage-interface/spec/lib/go/csi"
+ "github.com/seaweedfs/seaweedfs-csi-driver/pkg/k8s"
"github.com/seaweedfs/seaweedfs/weed/glog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -79,6 +80,14 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
}
return nil, status.Error(codes.Internal, err.Error())
}
+ //k8s api get Capacity
+ if capacity, err := k8s.GetVolumeCapacity(volumeID); err == nil {
+ if err := volume.Quota(capacity); err != nil {
+ return nil, err
+ }
+ } else {
+ return nil, err
+ }
ns.volumes.Store(volumeID, volume)
glog.Infof("volume %s successfully staged to %s", volumeID, stagingTargetPath)
@@ -259,7 +268,7 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
defer volumeMutex.Unlock()
if volume, ok := ns.volumes.Load(volumeID); ok {
- if err := volume.(*Volume).Expand(requiredBytes); err != nil {
+ if err := volume.(*Volume).Quota(requiredBytes); err != nil {
return nil, err
}
}
diff --git a/pkg/driver/volume.go b/pkg/driver/volume.go
index a6d7ddb..ac0a80a 100644
--- a/pkg/driver/volume.go
+++ b/pkg/driver/volume.go
@@ -80,7 +80,7 @@ func (vol *Volume) Publish(stagingTargetPath string, targetPath string, readOnly
return nil
}
-func (vol *Volume) Expand(sizeByte int64) error {
+func (vol *Volume) Quota(sizeByte int64) error {
target := fmt.Sprintf("passthrough:///unix://%s", vol.localSocket)
dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
diff --git a/pkg/k8s/client.go b/pkg/k8s/client.go
new file mode 100644
index 0000000..8861d0a
--- /dev/null
+++ b/pkg/k8s/client.go
@@ -0,0 +1,42 @@
+package k8s
+
+import (
+ "context"
+ "time"
+
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/rest"
+)
+
+func NewInCluster() (*kubernetes.Clientset, error) {
+ //creates the in-cluster config
+ config, err := rest.InClusterConfig()
+ if err != nil {
+ panic(err.Error())
+ }
+
+ // creates the clientset
+ clientset, err := kubernetes.NewForConfig(config)
+ if err != nil {
+ panic(err.Error())
+ }
+ return clientset, nil
+}
+
+func GetVolumeCapacity(volumeId string) (int64, error) {
+ client, err := NewInCluster()
+ if err != nil {
+ return 0, err
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
+
+ if volume, err := client.CoreV1().PersistentVolumes().Get(ctx, volumeId, metav1.GetOptions{}); err != nil {
+ return 0, err
+ } else {
+ storage := volume.Spec.Capacity.Storage()
+ capacity, _ := storage.AsInt64()
+ return capacity, nil
+ }
+}